remove some trash

This commit is contained in:
Alexander Tokmakov 2021-09-15 21:06:20 +03:00
parent 2bf47bb0ba
commit 3bca886174
11 changed files with 411 additions and 239 deletions

View File

@ -534,6 +534,13 @@ ExecutionStatus ExecutionStatus::fromCurrentException(const std::string & start_
return ExecutionStatus(getCurrentExceptionCode(), msg);
}
ExecutionStatus ExecutionStatus::fromText(const std::string & data)
{
ExecutionStatus status;
status.deserializeText(data);
return status;
}
ParsingException::ParsingException() = default;
ParsingException::ParsingException(const std::string & msg, int code)
: Exception(msg, code)

View File

@ -184,6 +184,8 @@ struct ExecutionStatus
static ExecutionStatus fromCurrentException(const std::string & start_of_message = "");
static ExecutionStatus fromText(const std::string & data);
std::string serializeText() const;
void deserializeText(const std::string & data);

View File

@ -1,6 +1,5 @@
#include <Common/SettingsChanges.h>
namespace DB
{
namespace

View File

@ -5,6 +5,9 @@
namespace DB
{
class IColumn;
struct SettingChange
{
String name;

View File

@ -772,7 +772,9 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica(
String shard_path = task.getShardNodePath();
String is_executed_path = fs::path(shard_path) / "executed";
String tries_to_execute_path = fs::path(shard_path) / "tries_to_execute";
zookeeper->createAncestors(fs::path(shard_path) / ""); /* appends "/" at the end of shard_path */
assert(shard_path.starts_with(String(fs::path(task.entry_path) / "shards" / "")));
zookeeper->createIfNotExists(fs::path(task.entry_path) / "shards", "");
zookeeper->createIfNotExists(shard_path, "");
/// Leader replica creates is_executed_path node on successful query execution.
/// We will remove create_shard_flag from zk operations list, if current replica is just waiting for leader to execute the query.

View File

@ -19,6 +19,41 @@
namespace DB
{
DataTypePtr getCoordinationErrorCodesEnumType()
{
return std::make_shared<DataTypeEnum8>(
DataTypeEnum8::Values
{
{"ZOK", static_cast<Int8>(Coordination::Error::ZOK)},
{"ZSYSTEMERROR", static_cast<Int8>(Coordination::Error::ZSYSTEMERROR)},
{"ZRUNTIMEINCONSISTENCY", static_cast<Int8>(Coordination::Error::ZRUNTIMEINCONSISTENCY)},
{"ZDATAINCONSISTENCY", static_cast<Int8>(Coordination::Error::ZDATAINCONSISTENCY)},
{"ZCONNECTIONLOSS", static_cast<Int8>(Coordination::Error::ZCONNECTIONLOSS)},
{"ZMARSHALLINGERROR", static_cast<Int8>(Coordination::Error::ZMARSHALLINGERROR)},
{"ZUNIMPLEMENTED", static_cast<Int8>(Coordination::Error::ZUNIMPLEMENTED)},
{"ZOPERATIONTIMEOUT", static_cast<Int8>(Coordination::Error::ZOPERATIONTIMEOUT)},
{"ZBADARGUMENTS", static_cast<Int8>(Coordination::Error::ZBADARGUMENTS)},
{"ZINVALIDSTATE", static_cast<Int8>(Coordination::Error::ZINVALIDSTATE)},
{"ZAPIERROR", static_cast<Int8>(Coordination::Error::ZAPIERROR)},
{"ZNONODE", static_cast<Int8>(Coordination::Error::ZNONODE)},
{"ZNOAUTH", static_cast<Int8>(Coordination::Error::ZNOAUTH)},
{"ZBADVERSION", static_cast<Int8>(Coordination::Error::ZBADVERSION)},
{"ZNOCHILDRENFOREPHEMERALS", static_cast<Int8>(Coordination::Error::ZNOCHILDRENFOREPHEMERALS)},
{"ZNODEEXISTS", static_cast<Int8>(Coordination::Error::ZNODEEXISTS)},
{"ZNOTEMPTY", static_cast<Int8>(Coordination::Error::ZNOTEMPTY)},
{"ZSESSIONEXPIRED", static_cast<Int8>(Coordination::Error::ZSESSIONEXPIRED)},
{"ZINVALIDCALLBACK", static_cast<Int8>(Coordination::Error::ZINVALIDCALLBACK)},
{"ZINVALIDACL", static_cast<Int8>(Coordination::Error::ZINVALIDACL)},
{"ZAUTHFAILED", static_cast<Int8>(Coordination::Error::ZAUTHFAILED)},
{"ZCLOSING", static_cast<Int8>(Coordination::Error::ZCLOSING)},
{"ZNOTHING", static_cast<Int8>(Coordination::Error::ZNOTHING)},
{"ZSESSIONMOVED", static_cast<Int8>(Coordination::Error::ZSESSIONMOVED)},
});
}
NamesAndTypesList ZooKeeperLogElement::getNamesAndTypes()
{
auto type_enum = std::make_shared<DataTypeEnum8>(
@ -52,36 +87,7 @@ NamesAndTypesList ZooKeeperLogElement::getNamesAndTypes()
{"SessionID", static_cast<Int16>(Coordination::OpNum::SessionID)},
});
auto error_enum = std::make_shared<DataTypeEnum8>(
DataTypeEnum8::Values
{
{"ZOK", static_cast<Int8>(Coordination::Error::ZOK)},
{"ZSYSTEMERROR", static_cast<Int8>(Coordination::Error::ZSYSTEMERROR)},
{"ZRUNTIMEINCONSISTENCY", static_cast<Int8>(Coordination::Error::ZRUNTIMEINCONSISTENCY)},
{"ZDATAINCONSISTENCY", static_cast<Int8>(Coordination::Error::ZDATAINCONSISTENCY)},
{"ZCONNECTIONLOSS", static_cast<Int8>(Coordination::Error::ZCONNECTIONLOSS)},
{"ZMARSHALLINGERROR", static_cast<Int8>(Coordination::Error::ZMARSHALLINGERROR)},
{"ZUNIMPLEMENTED", static_cast<Int8>(Coordination::Error::ZUNIMPLEMENTED)},
{"ZOPERATIONTIMEOUT", static_cast<Int8>(Coordination::Error::ZOPERATIONTIMEOUT)},
{"ZBADARGUMENTS", static_cast<Int8>(Coordination::Error::ZBADARGUMENTS)},
{"ZINVALIDSTATE", static_cast<Int8>(Coordination::Error::ZINVALIDSTATE)},
{"ZAPIERROR", static_cast<Int8>(Coordination::Error::ZAPIERROR)},
{"ZNONODE", static_cast<Int8>(Coordination::Error::ZNONODE)},
{"ZNOAUTH", static_cast<Int8>(Coordination::Error::ZNOAUTH)},
{"ZBADVERSION", static_cast<Int8>(Coordination::Error::ZBADVERSION)},
{"ZNOCHILDRENFOREPHEMERALS", static_cast<Int8>(Coordination::Error::ZNOCHILDRENFOREPHEMERALS)},
{"ZNODEEXISTS", static_cast<Int8>(Coordination::Error::ZNODEEXISTS)},
{"ZNOTEMPTY", static_cast<Int8>(Coordination::Error::ZNOTEMPTY)},
{"ZSESSIONEXPIRED", static_cast<Int8>(Coordination::Error::ZSESSIONEXPIRED)},
{"ZINVALIDCALLBACK", static_cast<Int8>(Coordination::Error::ZINVALIDCALLBACK)},
{"ZINVALIDACL", static_cast<Int8>(Coordination::Error::ZINVALIDACL)},
{"ZAUTHFAILED", static_cast<Int8>(Coordination::Error::ZAUTHFAILED)},
{"ZCLOSING", static_cast<Int8>(Coordination::Error::ZCLOSING)},
{"ZNOTHING", static_cast<Int8>(Coordination::Error::ZNOTHING)},
{"ZSESSIONMOVED", static_cast<Int8>(Coordination::Error::ZSESSIONMOVED)},
});
auto error_enum = getCoordinationErrorCodesEnumType();
auto watch_type_enum = std::make_shared<DataTypeEnum8>(
DataTypeEnum8::Values

View File

@ -73,4 +73,6 @@ class ZooKeeperLog : public SystemLog<ZooKeeperLogElement>
using SystemLog<ZooKeeperLogElement>::SystemLog;
};
DataTypePtr getCoordinationErrorCodesEnumType();
}

View File

@ -1,23 +1,14 @@
#include <algorithm>
#include <filesystem>
#include "StorageSystemDDLWorkerQueue.h"
#include <Columns/ColumnArray.h>
#include <Storages/System/StorageSystemDDLWorkerQueue.h>
#include <Interpreters/DDLTask.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeMap.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTIdentifier.h>
#include <Storages/SelectQueryInfo.h>
#include <Interpreters/ZooKeeperLog.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Parsers/ASTQueryWithOnCluster.h>
#include <Parsers/ParserQuery.h>
#include <Parsers/parseQuery.h>
@ -25,207 +16,334 @@
namespace fs = std::filesystem;
enum Status
{
ACTIVE,
FINISHED,
UNKNOWN,
ERRORED
};
namespace DB
{
std::vector<std::pair<String, Int8>> getStatusEnumsAndValues()
enum class Status
{
INACTIVE,
ACTIVE,
FINISHED,
REMOVING,
UNKNOWN,
};
using GetResponseFuture = std::future<Coordination::GetResponse>;
using ListResponseFuture = std::future<Coordination::ListResponse>;
using GetResponseFutures = std::vector<GetResponseFuture>;
using ListResponseFutures = std::vector<ListResponseFuture>;
static std::vector<std::pair<String, Int8>> getStatusEnumsAndValues()
{
return std::vector<std::pair<String, Int8>>{
{"Active", static_cast<Int8>(Status::ACTIVE)},
{"Finished", static_cast<Int8>(Status::FINISHED)},
{"Unknown", static_cast<Int8>(Status::UNKNOWN)},
{"Errored", static_cast<Int8>(Status::ERRORED)},
{"Inactive", static_cast<Int8>(Status::INACTIVE)},
{"Active", static_cast<Int8>(Status::ACTIVE)},
{"Finished", static_cast<Int8>(Status::FINISHED)},
{"Removing", static_cast<Int8>(Status::REMOVING)},
{"Unknown", static_cast<Int8>(Status::UNKNOWN)},
};
}
std::vector<std::pair<String, Int8>> getZooKeeperErrorEnumsAndValues()
{
return std::vector<std::pair<String, Int8>>{
{"ZOK", static_cast<Int8>(Coordination::Error::ZOK)},
{"ZSYSTEMERROR", static_cast<Int8>(Coordination::Error::ZSYSTEMERROR)},
{"ZRUNTIMEINCONSISTENCY", static_cast<Int8>(Coordination::Error::ZRUNTIMEINCONSISTENCY)},
{"ZDATAINCONSISTENCY", static_cast<Int8>(Coordination::Error::ZDATAINCONSISTENCY)},
{"ZCONNECTIONLOSS", static_cast<Int8>(Coordination::Error::ZCONNECTIONLOSS)},
{"ZMARSHALLINGERROR", static_cast<Int8>(Coordination::Error::ZMARSHALLINGERROR)},
{"ZUNIMPLEMENTED", static_cast<Int8>(Coordination::Error::ZUNIMPLEMENTED)},
{"ZOPERATIONTIMEOUT", static_cast<Int8>(Coordination::Error::ZOPERATIONTIMEOUT)},
{"ZBADARGUMENTS", static_cast<Int8>(Coordination::Error::ZBADARGUMENTS)},
{"ZINVALIDSTATE", static_cast<Int8>(Coordination::Error::ZINVALIDSTATE)},
{"ZAPIERROR", static_cast<Int8>(Coordination::Error::ZAPIERROR)},
{"ZNONODE", static_cast<Int8>(Coordination::Error::ZNONODE)},
{"ZNOAUTH", static_cast<Int8>(Coordination::Error::ZNOAUTH)},
{"ZBADVERSION", static_cast<Int8>(Coordination::Error::ZBADVERSION)},
{"ZNOCHILDRENFOREPHEMERALS", static_cast<Int8>(Coordination::Error::ZNOCHILDRENFOREPHEMERALS)},
{"ZNODEEXISTS", static_cast<Int8>(Coordination::Error::ZNODEEXISTS)},
{"ZNOTEMPTY", static_cast<Int8>(Coordination::Error::ZNOTEMPTY)},
{"ZSESSIONEXPIRED", static_cast<Int8>(Coordination::Error::ZSESSIONEXPIRED)},
{"ZINVALIDCALLBACK", static_cast<Int8>(Coordination::Error::ZINVALIDCALLBACK)},
{"ZINVALIDACL", static_cast<Int8>(Coordination::Error::ZINVALIDACL)},
{"ZAUTHFAILED", static_cast<Int8>(Coordination::Error::ZAUTHFAILED)},
{"ZCLOSING", static_cast<Int8>(Coordination::Error::ZCLOSING)},
{"ZNOTHING", static_cast<Int8>(Coordination::Error::ZNOTHING)},
{"ZSESSIONMOVED", static_cast<Int8>(Coordination::Error::ZSESSIONMOVED)},
};
}
NamesAndTypesList StorageSystemDDLWorkerQueue::getNamesAndTypes()
{
return {
{"entry", std::make_shared<DataTypeString>()},
{"host_name", std::make_shared<DataTypeString>()},
{"host_address", std::make_shared<DataTypeString>()},
{"port", std::make_shared<DataTypeUInt16>()},
{"status", std::make_shared<DataTypeEnum8>(getStatusEnumsAndValues())},
{"cluster", std::make_shared<DataTypeString>()},
{"query", std::make_shared<DataTypeString>()},
{"initiator", std::make_shared<DataTypeString>()},
{"query_start_time", std::make_shared<DataTypeDateTime>()},
{"query_finish_time", std::make_shared<DataTypeDateTime>()},
{"query_duration_ms", std::make_shared<DataTypeUInt64>()},
{"exception_code", std::make_shared<DataTypeEnum8>(getZooKeeperErrorEnumsAndValues())},
{"entry", std::make_shared<DataTypeString>()},
{"entry_version", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>())},
{"initiator_host", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>())},
{"initiator_port", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt16>())},
{"cluster", std::make_shared<DataTypeString>()},
{"query", std::make_shared<DataTypeString>()},
{"settings", std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeString>())},
{"query_create_time", std::make_shared<DataTypeDateTime>()},
{"host", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>())},
{"port", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt16>())},
{"status", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeEnum8>(getStatusEnumsAndValues()))},
{"exception_code", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt16>())},
{"exception_text", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>())},
{"query_finish_time", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>())},
{"query_duration_ms", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>())},
};
}
static String clusterNameFromDDLQuery(ContextPtr context, const DDLLogEntry & entry)
static String clusterNameFromDDLQuery(ContextPtr context, const DDLTask & task)
{
const char * begin = entry.query.data();
const char * end = begin + entry.query.size();
ASTPtr query;
ASTQueryWithOnCluster * query_on_cluster;
const char * begin = task.entry.query.data();
const char * end = begin + task.entry.query.size();
String cluster_name;
ParserQuery parser_query(end);
String description;
query = parseQuery(parser_query, begin, end, description, 0, context->getSettingsRef().max_parser_depth);
if (query && (query_on_cluster = dynamic_cast<ASTQueryWithOnCluster *>(query.get())))
String description = fmt::format("from {}", task.entry_path);
ASTPtr query = parseQuery(parser_query, begin, end, description,
context->getSettingsRef().max_query_size, context->getSettingsRef().max_parser_depth);
if (const auto * query_on_cluster = dynamic_cast<const ASTQueryWithOnCluster *>(query.get()))
cluster_name = query_on_cluster->cluster;
return cluster_name;
}
static void fillCommonColumns(MutableColumns & res_columns, size_t & col, const DDLTask & task, const String & cluster_name, UInt64 query_create_time_ms)
{
/// entry
res_columns[col++]->insert(task.entry_name);
/// entry_version
res_columns[col++]->insert(task.entry.version);
if (task.entry.initiator.empty())
{
/// initiator_host
res_columns[col++]->insert(Field{});
/// initiator_port
res_columns[col++]->insert(Field{});
}
else
{
HostID initiator = HostID::fromString(task.entry.initiator);
/// initiator_host
res_columns[col++]->insert(initiator.host_name);
/// initiator_port
res_columns[col++]->insert(initiator.port);
}
/// cluster
res_columns[col++]->insert(cluster_name);
/// query
res_columns[col++]->insert(task.entry.query);
Map settings_map;
if (task.entry.settings)
{
for (const auto & change : *task.entry.settings)
{
Tuple pair;
pair.push_back(change.name);
pair.push_back(toString(change.value));
settings_map.push_back(std::move(pair));
}
}
/// settings
res_columns[col++]->insert(settings_map);
res_columns[col++]->insert(static_cast<UInt64>(query_create_time_ms / 1000));
}
static void repeatValuesInCommonColumns(MutableColumns & res_columns, size_t num_filled_columns)
{
if (res_columns[num_filled_columns - 1]->size() == res_columns[num_filled_columns]->size() + 1)
{
/// Common columns are already filled
return;
}
/// Copy values from previous row
assert(res_columns[num_filled_columns - 1]->size() == res_columns[num_filled_columns]->size());
for (size_t filled_col = 0; filled_col < num_filled_columns; ++filled_col)
res_columns[filled_col]->insert((*res_columns[filled_col])[res_columns[filled_col]->size() - 1]);
}
static void fillHostnameColumns(MutableColumns & res_columns, size_t & col, const HostID & host_id)
{
/// NOTE host_id.host_name can be a domain name or an IP address
/// We could try to resolve domain name or reverse resolve an address and add two separate columns,
/// but seems like it's not really needed, so we show host_id.host_name as is.
/// host
res_columns[col++]->insert(host_id.host_name);
/// port
res_columns[col++]->insert(host_id.port);
}
static void fillStatusColumnsWithNulls(MutableColumns & res_columns, size_t & col, Status status)
{
/// status
res_columns[col++]->insert(static_cast<Int8>(status));
/// exception_code
res_columns[col++]->insert(Field{});
/// exception_text
res_columns[col++]->insert(Field{});
/// query_finish_time
res_columns[col++]->insert(Field{});
/// query_duration_ms
res_columns[col++]->insert(Field{});
}
static void fillStatusColumns(MutableColumns & res_columns, size_t & col,
GetResponseFuture & finished_data_future,
UInt64 query_create_time_ms)
{
auto maybe_finished_status = finished_data_future.get();
if (maybe_finished_status.error == Coordination::Error::ZNONODE)
return fillStatusColumnsWithNulls(res_columns, col, Status::REMOVING);
/// asyncTryGet should throw on other error codes
assert(maybe_finished_status.error == Coordination::Error::ZOK);
/// status
res_columns[col++]->insert(static_cast<Int8>(Status::FINISHED));
auto execution_status = ExecutionStatus::fromText(maybe_finished_status.data);
/// exception_code
res_columns[col++]->insert(execution_status.code);
/// exception_text
res_columns[col++]->insert(execution_status.message);
UInt64 query_finish_time_ms = maybe_finished_status.stat.ctime;
/// query_finish_time
res_columns[col++]->insert(static_cast<UInt64>(query_finish_time_ms / 1000));
/// query_duration_ms
res_columns[col++]->insert(static_cast<UInt64>(query_finish_time_ms - query_create_time_ms));
}
void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
{
zkutil::ZooKeeperPtr zookeeper = context->getZooKeeper();
Coordination::Error zk_exception_code = Coordination::Error::ZOK;
String ddl_zookeeper_path = config.getString("distributed_ddl.path", "/clickhouse/task_queue/ddl/");
String ddl_query_path;
fs::path ddl_zookeeper_path = context->getConfigRef().getString("distributed_ddl.path", "/clickhouse/task_queue/ddl/");
// this is equivalent to query zookeeper at the `ddl_zookeeper_path`
/* [zk: localhost:2181(CONNECTED) 51] ls /clickhouse/task_queue/ddl
[query-0000000000, query-0000000001, query-0000000002, query-0000000003, query-0000000004]
*/
Strings ddl_task_paths = zookeeper->getChildren(ddl_zookeeper_path);
zkutil::Strings queries;
GetResponseFutures ddl_task_futures;
ListResponseFutures active_nodes_futures;
ListResponseFutures finished_nodes_futures;
Coordination::Error code = zookeeper->tryGetChildren(ddl_zookeeper_path, queries);
// if there is an error here, just register the code.
// the queries will be empty and so there will be nothing to fill the table
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
zk_exception_code = code;
const auto clusters = context->getClusters();
for (const auto & name_and_cluster : clusters->getContainer())
for (const auto & task_path : ddl_task_paths)
{
const ClusterPtr & cluster = name_and_cluster.second;
const auto & shards_info = cluster->getShardsInfo();
const auto & addresses_with_failover = cluster->getShardsAddresses();
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
ddl_task_futures.push_back(zookeeper->asyncTryGet(ddl_zookeeper_path / task_path));
/// List status dirs. Active host may become finished, so we list active first.
active_nodes_futures.push_back(zookeeper->asyncTryGetChildrenNoThrow(ddl_zookeeper_path / task_path / "active"));
finished_nodes_futures.push_back(zookeeper->asyncTryGetChildrenNoThrow(ddl_zookeeper_path / task_path / "finished"));
}
for (size_t i = 0; i < ddl_task_paths.size(); ++i)
{
auto maybe_task = ddl_task_futures[i].get();
if (maybe_task.error != Coordination::Error::ZOK)
{
const auto & shard_addresses = addresses_with_failover[shard_index];
const auto & shard_info = shards_info[shard_index];
const auto pool_status = shard_info.pool->getStatus();
for (size_t replica_index = 0; replica_index < shard_addresses.size(); ++replica_index)
/// Task is removed
assert(maybe_task.error == Coordination::Error::ZNONODE);
continue;
}
DDLTask task{ddl_task_paths[i], ddl_zookeeper_path / ddl_task_paths[i]};
try
{
task.entry.parse(maybe_task.data);
}
catch (Exception & e)
{
e.addMessage("On parsing DDL entry {}: {}", task.entry_path, maybe_task.data);
throw;
}
String cluster_name = clusterNameFromDDLQuery(context, task);
UInt64 query_create_time_ms = maybe_task.stat.ctime;
size_t col = 0;
fillCommonColumns(res_columns, col, task, cluster_name, query_create_time_ms);
/// At first we process finished nodes, to avoid duplication if some host was active
/// and suddenly become finished during status dirs listing.
/// Then we process active (but not finished) hosts.
/// And then we process the rest hosts from task.entry.hosts list.
/// NOTE: It's not guaranteed that task.entry.hosts contains all host ids from status dirs.
std::unordered_set<String> processed_hosts;
/// Race condition with DDLWorker::cleanupQueue(...) is possible.
/// We may get incorrect list of finished nodes if task is currently removing.
/// To avoid showing INACTIVE status for hosts that have actually executed query,
/// we will detect if someone is removing task and show special REMOVING status.
/// Also we should distinguish it from another case when status dirs are not created yet (extremely rare case).
bool is_removing_task = false;
auto maybe_finished_hosts = finished_nodes_futures[i].get();
if (maybe_finished_hosts.error == Coordination::Error::ZOK)
{
GetResponseFutures finished_status_futures;
for (const auto & host_id_str : maybe_finished_hosts.names)
finished_status_futures.push_back(zookeeper->asyncTryGet(fs::path(task.entry_path) / "finished" / host_id_str));
for (size_t host_idx = 0; host_idx < maybe_finished_hosts.names.size(); ++host_idx)
{
/* Dir contents of every query will be similar to
[zk: localhost:2181(CONNECTED) 53] ls /clickhouse/task_queue/ddl/query-0000000004
[active, finished]
*/
std::vector<std::future<Coordination::GetResponse>> futures;
futures.reserve(queries.size());
for (const String & q : queries)
{
futures.push_back(zookeeper->asyncTryGet(fs::path(ddl_zookeeper_path) / q));
}
for (size_t query_id = 0; query_id < queries.size(); query_id++)
{
Int64 query_finish_time = 0;
size_t i = 0;
res_columns[i++]->insert(queries[query_id]); // entry
const auto & address = shard_addresses[replica_index];
res_columns[i++]->insert(address.host_name);
auto resolved = address.getResolvedAddress();
res_columns[i++]->insert(resolved ? resolved->host().toString() : String()); // host_address
res_columns[i++]->insert(address.port);
ddl_query_path = fs::path(ddl_zookeeper_path) / queries[query_id];
zkutil::Strings active_nodes;
zkutil::Strings finished_nodes;
code = zookeeper->tryGetChildren(fs::path(ddl_query_path) / "active", active_nodes);
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
zk_exception_code = code;
code = zookeeper->tryGetChildren(fs::path(ddl_query_path) / "finished", finished_nodes);
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
zk_exception_code = code;
/* status:
* active: If the hostname:port entry is present under active path.
* finished: If the hostname:port entry is present under the finished path.
* errored: If the hostname:port entry is present under the finished path but the error count is not 0.
* unknown: If the above cases don't hold true, then status is unknown.
*/
if (std::find(active_nodes.begin(), active_nodes.end(), address.toString()) != active_nodes.end())
{
res_columns[i++]->insert(static_cast<Int8>(Status::ACTIVE));
}
else if (std::find(finished_nodes.begin(), finished_nodes.end(), address.toString()) != finished_nodes.end())
{
if (pool_status[replica_index].error_count != 0)
{
res_columns[i++]->insert(static_cast<Int8>(Status::ERRORED));
}
else
{
res_columns[i++]->insert(static_cast<Int8>(Status::FINISHED));
}
// regardless of the status finished or errored, the node host_name:port entry was found under the /finished path
// & should be able to get the contents of the znode at /finished path.
auto res_fn = zookeeper->asyncTryGet(fs::path(ddl_query_path) / "finished");
auto stat_fn = res_fn.get().stat;
query_finish_time = stat_fn.mtime;
}
else
{
res_columns[i++]->insert(static_cast<Int8>(Status::UNKNOWN));
}
Coordination::GetResponse res;
res = futures[query_id].get();
auto query_start_time = res.stat.mtime;
DDLLogEntry entry;
entry.parse(res.data);
String cluster_name = clusterNameFromDDLQuery(context, entry);
res_columns[i++]->insert(cluster_name);
res_columns[i++]->insert(entry.query);
res_columns[i++]->insert(entry.initiator);
res_columns[i++]->insert(UInt64(query_start_time / 1000));
res_columns[i++]->insert(UInt64(query_finish_time / 1000));
res_columns[i++]->insert(UInt64(query_finish_time - query_start_time));
res_columns[i++]->insert(static_cast<Int8>(zk_exception_code));
}
const auto & host_id_str = maybe_finished_hosts.names[host_idx];
HostID host_id = HostID::fromString(host_id_str);
repeatValuesInCommonColumns(res_columns, col);
size_t rest_col = col;
fillHostnameColumns(res_columns, rest_col, host_id);
fillStatusColumns(res_columns, rest_col, finished_status_futures[host_idx], query_create_time_ms);
processed_hosts.insert(host_id_str);
}
}
else if (maybe_finished_hosts.error == Coordination::Error::ZNONODE)
{
/// Rare case: Either status dirs are not created yet or already removed.
/// We can distinguish it by checking if task node exists, because "query-xxx" and "query-xxx/finished"
/// are removed in single multi-request
is_removing_task = !zookeeper->exists(task.entry_path);
}
else
{
throw Coordination::Exception(maybe_finished_hosts.error, fs::path(task.entry_path) / "finished");
}
/// Process active nodes
auto maybe_active_hosts = active_nodes_futures[i].get();
if (maybe_active_hosts.error == Coordination::Error::ZOK)
{
for (const auto & host_id_str : maybe_active_hosts.names)
{
if (processed_hosts.contains(host_id_str))
continue;
HostID host_id = HostID::fromString(host_id_str);
repeatValuesInCommonColumns(res_columns, col);
size_t rest_col = col;
fillHostnameColumns(res_columns, rest_col, host_id);
fillStatusColumnsWithNulls(res_columns, rest_col, Status::ACTIVE);
processed_hosts.insert(host_id_str);
}
}
else if (maybe_active_hosts.error == Coordination::Error::ZNONODE)
{
/// Rare case: Either status dirs are not created yet or task is currently removing.
/// When removing a task, at first we remove "query-xxx/active" (not recursively),
/// then recursively remove everything except "query-xxx/finished"
/// and then remove "query-xxx" and "query-xxx/finished".
is_removing_task = is_removing_task ||
(zookeeper->exists(fs::path(task.entry_path) / "finished") && !zookeeper->exists(fs::path(task.entry_path) / "active")) ||
!zookeeper->exists(task.entry_path);
}
else
{
throw Coordination::Exception(maybe_active_hosts.error, fs::path(task.entry_path) / "active");
}
/// Process the rest hosts
for (const auto & host_id : task.entry.hosts)
{
if (processed_hosts.contains(host_id.toString()))
continue;
Status status = is_removing_task ? Status::REMOVING : Status::INACTIVE;
repeatValuesInCommonColumns(res_columns, col);
size_t rest_col = col;
fillHostnameColumns(res_columns, rest_col, host_id);
fillStatusColumnsWithNulls(res_columns, rest_col, status);
processed_hosts.insert(host_id.toString());
}
if (processed_hosts.empty())
{
/// We don't know any hosts, just fill the rest columns with nulls.
/// host
res_columns[col++]->insert(Field{});
/// port
res_columns[col++]->insert(Field{});
fillStatusColumnsWithNulls(res_columns, col, Status::UNKNOWN);
}
}
}
}

View File

@ -1,12 +1,8 @@
#pragma once
#include <Poco/Util/Application.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Interpreters/DDLWorker.h>
#include <Storages/System/IStorageSystemOneBlock.h>
#include <common/shared_ptr_helper.h>
#include <future>
namespace DB
{
@ -19,10 +15,9 @@ class StorageSystemDDLWorkerQueue final : public shared_ptr_helper<StorageSystem
public IStorageSystemOneBlock<StorageSystemDDLWorkerQueue>
{
friend struct shared_ptr_helper<StorageSystemDDLWorkerQueue>;
Poco::Util::LayeredConfiguration & config = Poco::Util::Application::instance().config();
protected:
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const override;
using IStorageSystemOneBlock::IStorageSystemOneBlock;

View File

@ -3,7 +3,7 @@ Received exception from server:
Code: 57. Error: Received from localhost:9000. Error: There was an error on [localhost:9000]: Code: 57. Error: Table default.none already exists. (TABLE_ALREADY_EXISTS)
(query: create table none on cluster test_shard_localhost (n int) engine=Memory;)
Received exception from server:
Code: 159. Error: Received from localhost:9000. Error: Watching task <task> is executing longer than distributed_ddl_task_timeout (=1) seconds. There are 1 unfinished hosts (0 of them are currently active), they are going to execute the query in background. (TIMEOUT_EXCEEDED)
Code: 159. Error: Received from localhost:9000. Error: Watching task <task> is executing longer than distributed_ddl_task_timeout (=3) seconds. There are 1 unfinished hosts (0 of them are currently active), they are going to execute the query in background. (TIMEOUT_EXCEEDED)
(query: drop table if exists none on cluster test_unavailable_shard;)
throw
localhost 9000 0 0 0
@ -12,7 +12,7 @@ Code: 57. Error: Received from localhost:9000. Error: There was an error on [loc
(query: create table throw on cluster test_shard_localhost (n int) engine=Memory format Null;)
localhost 9000 0 1 0
Received exception from server:
Code: 159. Error: Received from localhost:9000. Error: Watching task <task> is executing longer than distributed_ddl_task_timeout (=1) seconds. There are 1 unfinished hosts (0 of them are currently active), they are going to execute the query in background. (TIMEOUT_EXCEEDED)
Code: 159. Error: Received from localhost:9000. Error: Watching task <task> is executing longer than distributed_ddl_task_timeout (=3) seconds. There are 1 unfinished hosts (0 of them are currently active), they are going to execute the query in background. (TIMEOUT_EXCEEDED)
(query: drop table if exists throw on cluster test_unavailable_shard;)
null_status_on_timeout
localhost 9000 0 0 0
@ -26,3 +26,20 @@ localhost 9000 0 0 0
localhost 9000 57 Code: 57. Error: Table default.never_throw already exists. (TABLE_ALREADY_EXISTS) 0 0
localhost 9000 0 1 0
localhost 1 \N \N 1 0
distributed_ddl_queue
2 localhost 9000 test_shard_localhost CREATE TABLE default.none ON CLUSTER test_shard_localhost (`n` int) ENGINE = Memory 1 localhost 9000 Finished 0 1 1
2 localhost 9000 test_shard_localhost CREATE TABLE default.none ON CLUSTER test_shard_localhost (`n` int) ENGINE = Memory 1 localhost 9000 Finished 57 Code: 57. DB::Error: Table default.none already exists. (TABLE_ALREADY_EXISTS) 1 1
2 localhost 9000 test_unavailable_shard DROP TABLE IF EXISTS default.none ON CLUSTER test_unavailable_shard 1 localhost 1 Inactive \N \N \N \N
2 localhost 9000 test_unavailable_shard DROP TABLE IF EXISTS default.none ON CLUSTER test_unavailable_shard 1 localhost 9000 Finished 0 1 1
2 localhost 9000 test_shard_localhost CREATE TABLE default.throw ON CLUSTER test_shard_localhost (`n` int) ENGINE = Memory 1 localhost 9000 Finished 0 1 1
2 localhost 9000 test_shard_localhost CREATE TABLE default.throw ON CLUSTER test_shard_localhost (`n` int) ENGINE = Memory 1 localhost 9000 Finished 57 Code: 57. DB::Error: Table default.throw already exists. (TABLE_ALREADY_EXISTS) 1 1
2 localhost 9000 test_unavailable_shard DROP TABLE IF EXISTS default.throw ON CLUSTER test_unavailable_shard 1 localhost 1 Inactive \N \N \N \N
2 localhost 9000 test_unavailable_shard DROP TABLE IF EXISTS default.throw ON CLUSTER test_unavailable_shard 1 localhost 9000 Finished 0 1 1
2 localhost 9000 test_shard_localhost CREATE TABLE default.null_status ON CLUSTER test_shard_localhost (`n` int) ENGINE = Memory 1 localhost 9000 Finished 0 1 1
2 localhost 9000 test_shard_localhost CREATE TABLE default.null_status ON CLUSTER test_shard_localhost (`n` int) ENGINE = Memory 1 localhost 9000 Finished 57 Code: 57. DB::Error: Table default.null_status already exists. (TABLE_ALREADY_EXISTS) 1 1
2 localhost 9000 test_unavailable_shard DROP TABLE IF EXISTS default.null_status ON CLUSTER test_unavailable_shard 1 localhost 1 Inactive \N \N \N \N
2 localhost 9000 test_unavailable_shard DROP TABLE IF EXISTS default.null_status ON CLUSTER test_unavailable_shard 1 localhost 9000 Finished 0 1 1
2 localhost 9000 test_shard_localhost CREATE TABLE default.never_throw ON CLUSTER test_shard_localhost (`n` int) ENGINE = Memory 1 localhost 9000 Finished 0 1 1
2 localhost 9000 test_shard_localhost CREATE TABLE default.never_throw ON CLUSTER test_shard_localhost (`n` int) ENGINE = Memory 1 localhost 9000 Finished 57 Code: 57. DB::Error: Table default.never_throw already exists. (TABLE_ALREADY_EXISTS) 1 1
2 localhost 9000 test_unavailable_shard DROP TABLE IF EXISTS default.never_throw ON CLUSTER test_unavailable_shard 1 localhost 1 Inactive \N \N \N \N
2 localhost 9000 test_unavailable_shard DROP TABLE IF EXISTS default.never_throw ON CLUSTER test_unavailable_shard 1 localhost 9000 Finished 0 1 1

View File

@ -30,44 +30,65 @@ function run_until_out_contains()
done
}
RAND_COMMENT="01175_DDL_$RANDOM"
LOG_COMMENT="${CLICKHOUSE_LOG_COMMENT}_$RAND_COMMENT"
CLICKHOUSE_CLIENT_WITH_SETTINGS=${CLICKHOUSE_CLIENT/--log_comment=\'${CLICKHOUSE_LOG_COMMENT}\'/--log_comment=\'${LOG_COMMENT}\'}
CLICKHOUSE_CLIENT_WITH_SETTINGS+=" --output_format_parallel_formatting=0 "
CLICKHOUSE_CLIENT_WITH_SETTINGS+=" --distributed_ddl_entry_format_version=2 "
CLIENT=${CLICKHOUSE_CLIENT_WITH_SETTINGS}
CLIENT+=" --distributed_ddl_task_timeout=300 "
CLIENT_TIMEOUT=${CLICKHOUSE_CLIENT_WITH_SETTINGS}
CLIENT_TIMEOUT+=" --distributed_ddl_task_timeout=3 "
$CLICKHOUSE_CLIENT -q "drop table if exists none;"
$CLICKHOUSE_CLIENT -q "drop table if exists throw;"
$CLICKHOUSE_CLIENT -q "drop table if exists null_status;"
$CLICKHOUSE_CLIENT -q "drop table if exists never_throw;"
$CLICKHOUSE_CLIENT --output_format_parallel_formatting=0 --distributed_ddl_task_timeout=600 --distributed_ddl_output_mode=none -q "select value from system.settings where name='distributed_ddl_output_mode';"
$CLIENT --distributed_ddl_output_mode=none -q "select value from system.settings where name='distributed_ddl_output_mode';"
# Ok
$CLICKHOUSE_CLIENT --output_format_parallel_formatting=0 --distributed_ddl_task_timeout=600 --distributed_ddl_output_mode=none -q "create table none on cluster test_shard_localhost (n int) engine=Memory;"
$CLIENT --distributed_ddl_output_mode=none -q "create table none on cluster test_shard_localhost (n int) engine=Memory;"
# Table exists
$CLICKHOUSE_CLIENT --output_format_parallel_formatting=0 --distributed_ddl_task_timeout=600 --distributed_ddl_output_mode=none -q "create table none on cluster test_shard_localhost (n int) engine=Memory;" 2>&1 | sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//"
$CLIENT --distributed_ddl_output_mode=none -q "create table none on cluster test_shard_localhost (n int) engine=Memory;" 2>&1 | sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//"
# Timeout
run_until_out_contains 'There are 1 unfinished hosts' $CLICKHOUSE_CLIENT --output_format_parallel_formatting=0 --distributed_ddl_task_timeout=1 --distributed_ddl_output_mode=none -q "drop table if exists none on cluster test_unavailable_shard;" 2>&1 | sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//" | sed "s/Watching task .* is executing longer/Watching task <task> is executing longer/"
run_until_out_contains 'There are 1 unfinished hosts' $CLIENT_TIMEOUT --distributed_ddl_output_mode=none -q "drop table if exists none on cluster test_unavailable_shard;" 2>&1 | sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//" | sed "s/Watching task .* is executing longer/Watching task <task> is executing longer/"
$CLICKHOUSE_CLIENT --output_format_parallel_formatting=0 --distributed_ddl_task_timeout=600 --distributed_ddl_output_mode=throw -q "select value from system.settings where name='distributed_ddl_output_mode';"
$CLICKHOUSE_CLIENT --output_format_parallel_formatting=0 --distributed_ddl_task_timeout=600 --distributed_ddl_output_mode=throw -q "create table throw on cluster test_shard_localhost (n int) engine=Memory;"
$CLICKHOUSE_CLIENT --output_format_parallel_formatting=0 --distributed_ddl_task_timeout=600 --distributed_ddl_output_mode=throw -q "create table throw on cluster test_shard_localhost (n int) engine=Memory format Null;" 2>&1 | sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//"
$CLIENT --distributed_ddl_output_mode=throw -q "select value from system.settings where name='distributed_ddl_output_mode';"
$CLIENT --distributed_ddl_output_mode=throw -q "create table throw on cluster test_shard_localhost (n int) engine=Memory;"
$CLIENT --distributed_ddl_output_mode=throw -q "create table throw on cluster test_shard_localhost (n int) engine=Memory format Null;" 2>&1 | sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//"
run_until_out_contains 'There are 1 unfinished hosts' $CLICKHOUSE_CLIENT --output_format_parallel_formatting=0 --distributed_ddl_task_timeout=1 --distributed_ddl_output_mode=throw -q "drop table if exists throw on cluster test_unavailable_shard;" 2>&1 | sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//" | sed "s/Watching task .* is executing longer/Watching task <task> is executing longer/"
run_until_out_contains 'There are 1 unfinished hosts' $CLIENT_TIMEOUT --distributed_ddl_output_mode=throw -q "drop table if exists throw on cluster test_unavailable_shard;" 2>&1 | sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//" | sed "s/Watching task .* is executing longer/Watching task <task> is executing longer/"
$CLICKHOUSE_CLIENT --output_format_parallel_formatting=0 --distributed_ddl_task_timeout=600 --distributed_ddl_output_mode=null_status_on_timeout -q "select value from system.settings where name='distributed_ddl_output_mode';"
$CLICKHOUSE_CLIENT --output_format_parallel_formatting=0 --distributed_ddl_task_timeout=600 --distributed_ddl_output_mode=null_status_on_timeout -q "create table null_status on cluster test_shard_localhost (n int) engine=Memory;"
$CLICKHOUSE_CLIENT --output_format_parallel_formatting=0 --distributed_ddl_task_timeout=600 --distributed_ddl_output_mode=null_status_on_timeout -q "create table null_status on cluster test_shard_localhost (n int) engine=Memory format Null;" 2>&1 | sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//"
$CLIENT --distributed_ddl_output_mode=null_status_on_timeout -q "select value from system.settings where name='distributed_ddl_output_mode';"
$CLIENT --distributed_ddl_output_mode=null_status_on_timeout -q "create table null_status on cluster test_shard_localhost (n int) engine=Memory;"
$CLIENT --distributed_ddl_output_mode=null_status_on_timeout -q "create table null_status on cluster test_shard_localhost (n int) engine=Memory format Null;" 2>&1 | sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//"
run_until_out_contains '9000 0 ' $CLICKHOUSE_CLIENT --output_format_parallel_formatting=0 --distributed_ddl_task_timeout=1 --distributed_ddl_output_mode=null_status_on_timeout -q "drop table if exists null_status on cluster test_unavailable_shard;"
run_until_out_contains '9000 0 ' $CLIENT_TIMEOUT --distributed_ddl_output_mode=null_status_on_timeout -q "drop table if exists null_status on cluster test_unavailable_shard;"
$CLICKHOUSE_CLIENT --output_format_parallel_formatting=0 --distributed_ddl_task_timeout=600 --distributed_ddl_output_mode=never_throw -q "select value from system.settings where name='distributed_ddl_output_mode';"
$CLICKHOUSE_CLIENT --output_format_parallel_formatting=0 --distributed_ddl_task_timeout=600 --distributed_ddl_output_mode=never_throw -q "create table never_throw on cluster test_shard_localhost (n int) engine=Memory;"
$CLICKHOUSE_CLIENT --output_format_parallel_formatting=0 --distributed_ddl_task_timeout=600 --distributed_ddl_output_mode=never_throw -q "create table never_throw on cluster test_shard_localhost (n int) engine=Memory;" 2>&1 | sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//"
$CLIENT --distributed_ddl_output_mode=never_throw -q "select value from system.settings where name='distributed_ddl_output_mode';"
$CLIENT --distributed_ddl_output_mode=never_throw -q "create table never_throw on cluster test_shard_localhost (n int) engine=Memory;"
$CLIENT --distributed_ddl_output_mode=never_throw -q "create table never_throw on cluster test_shard_localhost (n int) engine=Memory;" 2>&1 | sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//"
run_until_out_contains '9000 0 ' $CLICKHOUSE_CLIENT --output_format_parallel_formatting=0 --distributed_ddl_task_timeout=1 --distributed_ddl_output_mode=never_throw -q "drop table if exists never_throw on cluster test_unavailable_shard;"
run_until_out_contains '9000 0 ' $CLIENT_TIMEOUT --distributed_ddl_output_mode=never_throw -q "drop table if exists never_throw on cluster test_unavailable_shard;"
$CLICKHOUSE_CLIENT -q "drop table if exists none;"
$CLICKHOUSE_CLIENT -q "drop table if exists throw;"
$CLICKHOUSE_CLIENT -q "drop table if exists null_status;"
$CLICKHOUSE_CLIENT -q "drop table if exists never_throw;"
$CLICKHOUSE_CLIENT -q "select 'distributed_ddl_queue'"
$CLICKHOUSE_CLIENT -q "select entry_version, initiator_host, initiator_port, cluster, replaceRegexpOne(query, 'UUID \'[0-9a-f\-]{36}\' ', ''), abs(query_create_time - now()) < 600,
host, port, status, exception_code, replace(replaceRegexpOne(exception_text, ' \(version.*', ''), 'Exception', 'Error'), abs(query_finish_time - query_create_time - query_duration_ms/1000) <= 1 , query_duration_ms < 600000
from system.distributed_ddl_queue
where arrayExists((key, val) -> key='log_comment' and val like '%$RAND_COMMENT%', mapKeys(settings), mapValues(settings))
order by entry, host, port, exception_code"