better table with DDDL status

This commit is contained in:
Alexander Tokmakov 2022-05-11 16:09:14 +02:00
parent 897a0393b2
commit 9e37df7989

View File

@ -12,9 +12,11 @@
#include <Access/ContextAccess.h>
#include <Common/Macros.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Databases/DatabaseReplicated.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeEnum.h>
#include <Processors/Sinks/EmptySink.h>
#include <QueryPipeline/Pipe.h>
#include <filesystem>
@ -203,10 +205,24 @@ public:
private:
static Strings getChildrenAllowNoNode(const std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & node_path);
static Block getSampleBlock(ContextPtr context_, bool hosts_to_wait);
Strings getNewAndUpdate(const Strings & current_list_of_finished_hosts);
std::pair<String, UInt16> parseHostAndPort(const String & host_id) const;
Chunk generateChunkWithUnfinishedHosts() const;
enum ReplicatedDatabaseQueryStatus
{
/// Query is (successfully) finished
OK = 0,
/// Query is not finished yet, but replica is currently executing it
IN_PROGRESS = 1,
/// Replica is not available or busy with previous queries. It will process query asynchronously
QUEUED = 2,
};
String node_path;
ContextPtr context;
Stopwatch watch;
@ -222,7 +238,7 @@ private:
std::unique_ptr<Exception> first_exception;
Int64 timeout_seconds = 120;
bool by_hostname = true;
bool is_replicated_database = false;
bool throw_on_timeout = true;
bool timeout_exceeded = false;
};
@ -243,7 +259,7 @@ BlockIO getDistributedDDLStatus(const String & node_path, const DDLLogEntry & en
return io;
}
static Block getSampleBlock(ContextPtr context_, bool hosts_to_wait)
Block DDLQueryStatusSource::getSampleBlock(ContextPtr context_, bool hosts_to_wait)
{
auto output_mode = context_->getSettingsRef().distributed_ddl_output_mode;
@ -254,19 +270,38 @@ static Block getSampleBlock(ContextPtr context_, bool hosts_to_wait)
return std::make_shared<DataTypeNullable>(type);
};
Block res = Block{
{std::make_shared<DataTypeString>(), "host"},
{std::make_shared<DataTypeUInt16>(), "port"},
{maybe_make_nullable(std::make_shared<DataTypeInt64>()), "status"},
{maybe_make_nullable(std::make_shared<DataTypeString>()), "error"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_remaining"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_active"},
auto get_status_enum = []()
{
return std::make_shared<DataTypeEnum8>(
DataTypeEnum8::Values
{
{"OK", static_cast<Int8>(OK)},
{"IN_PROGRESS", static_cast<Int8>(IN_PROGRESS)},
{"QUEUED", static_cast<Int8>(QUEUED)},
});
};
if (hosts_to_wait)
res.erase("port");
return res;
{
return Block{
{std::make_shared<DataTypeString>(), "shard"},
{std::make_shared<DataTypeString>(), "replica"},
{get_status_enum(), "status"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_remaining"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_active"},
};
}
else
{
return Block{
{std::make_shared<DataTypeString>(), "host"},
{std::make_shared<DataTypeUInt16>(), "port"},
{maybe_make_nullable(std::make_shared<DataTypeInt64>()), "status"},
{maybe_make_nullable(std::make_shared<DataTypeString>()), "error"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_remaining"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_active"},
};
}
}
DDLQueryStatusSource::DDLQueryStatusSource(
@ -283,7 +318,7 @@ DDLQueryStatusSource::DDLQueryStatusSource(
if (hosts_to_wait)
{
waiting_hosts = NameSet(hosts_to_wait->begin(), hosts_to_wait->end());
by_hostname = false;
is_replicated_database = true;
}
else
{
@ -299,7 +334,7 @@ std::pair<String, UInt16> DDLQueryStatusSource::parseHostAndPort(const String &
{
String host = host_id;
UInt16 port = 0;
if (by_hostname)
if (!is_replicated_database)
{
auto host_and_port = Cluster::Address::fromString(host_id);
host = host_and_port.first;
@ -308,6 +343,43 @@ std::pair<String, UInt16> DDLQueryStatusSource::parseHostAndPort(const String &
return {host, port};
}
Chunk DDLQueryStatusSource::generateChunkWithUnfinishedHosts() const
{
NameSet unfinished_hosts = waiting_hosts;
for (const auto & host_id : finished_hosts)
unfinished_hosts.erase(host_id);
NameSet active_hosts_set = NameSet{current_active_hosts.begin(), current_active_hosts.end()};
/// Query is not finished on the rest hosts, so fill the corresponding rows with NULLs.
MutableColumns columns = output.getHeader().cloneEmptyColumns();
for (const String & host_id : unfinished_hosts)
{
size_t num = 0;
if (is_replicated_database)
{
auto [shard, replica] = DatabaseReplicated::parseFullReplicaName(host_id);
columns[num++]->insert(shard);
columns[num++]->insert(replica);
if (active_hosts_set.contains(host_id))
columns[num++]->insert(IN_PROGRESS);
else
columns[num++]->insert(QUEUED);
}
else
{
auto [host, port] = parseHostAndPort(host_id);
columns[num++]->insert(host);
columns[num++]->insert(port);
columns[num++]->insert(Field{});
columns[num++]->insert(Field{});
}
columns[num++]->insert(unfinished_hosts.size());
columns[num++]->insert(current_active_hosts.size());
}
return Chunk(std::move(columns), unfinished_hosts.size());
}
Chunk DDLQueryStatusSource::generate()
{
bool all_hosts_finished = num_hosts_finished >= waiting_hosts.size();
@ -342,30 +414,16 @@ Chunk DDLQueryStatusSource::generate()
first_exception = std::make_unique<Exception>(
fmt::format(msg_format, node_path, timeout_seconds, num_unfinished_hosts, num_active_hosts),
ErrorCodes::TIMEOUT_EXCEEDED);
/// For Replicated database print a list of unfinished hosts as well. Will return empty block on next iteration.
if (is_replicated_database)
return generateChunkWithUnfinishedHosts();
return {};
}
LOG_INFO(log, msg_format, node_path, timeout_seconds, num_unfinished_hosts, num_active_hosts);
NameSet unfinished_hosts = waiting_hosts;
for (const auto & host_id : finished_hosts)
unfinished_hosts.erase(host_id);
/// Query is not finished on the rest hosts, so fill the corresponding rows with NULLs.
MutableColumns columns = output.getHeader().cloneEmptyColumns();
for (const String & host_id : unfinished_hosts)
{
auto [host, port] = parseHostAndPort(host_id);
size_t num = 0;
columns[num++]->insert(host);
if (by_hostname)
columns[num++]->insert(port);
columns[num++]->insert(Field{});
columns[num++]->insert(Field{});
columns[num++]->insert(num_unfinished_hosts);
columns[num++]->insert(num_active_hosts);
}
return Chunk(std::move(columns), unfinished_hosts.size());
return generateChunkWithUnfinishedHosts();
}
if (num_hosts_finished != 0 || try_number != 0)
@ -404,11 +462,15 @@ Chunk DDLQueryStatusSource::generate()
status.tryDeserializeText(status_data);
}
auto [host, port] = parseHostAndPort(host_id);
if (status.code != 0 && !first_exception
&& context->getSettingsRef().distributed_ddl_output_mode != DistributedDDLOutputMode::NEVER_THROW)
{
/// Replicated database retries in case of error, it should not write error status.
if (is_replicated_database)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There was an error on {}: {} (probably it's a bug)", host_id, status.message);
auto [host, port] = parseHostAndPort(host_id);
first_exception = std::make_unique<Exception>(
fmt::format("There was an error on [{}:{}]: {}", host, port, status.message), status.code);
}
@ -416,11 +478,23 @@ Chunk DDLQueryStatusSource::generate()
++num_hosts_finished;
size_t num = 0;
columns[num++]->insert(host);
if (by_hostname)
if (is_replicated_database)
{
if (status.code != 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There was an error on {}: {} (probably it's a bug)", host_id, status.message);
auto [shard, replica] = DatabaseReplicated::parseFullReplicaName(host_id);
columns[num++]->insert(shard);
columns[num++]->insert(replica);
columns[num++]->insert(OK);
}
else
{
auto [host, port] = parseHostAndPort(host_id);
columns[num++]->insert(host);
columns[num++]->insert(port);
columns[num++]->insert(status.code);
columns[num++]->insert(status.message);
columns[num++]->insert(status.code);
columns[num++]->insert(status.message);
}
columns[num++]->insert(waiting_hosts.size() - num_hosts_finished);
columns[num++]->insert(current_active_hosts.size());
}