2020-11-03 13:47:26 +00:00
|
|
|
#include <Interpreters/executeDDLQueryOnCluster.h>
|
|
|
|
#include <Interpreters/DDLWorker.h>
|
|
|
|
#include <Interpreters/DDLTask.h>
|
|
|
|
#include <Interpreters/AddDefaultDatabaseVisitor.h>
|
2021-01-18 14:09:39 +00:00
|
|
|
#include <Interpreters/Context.h>
|
2020-11-03 13:47:26 +00:00
|
|
|
#include <Parsers/ASTQueryWithOutput.h>
|
|
|
|
#include <Parsers/ASTQueryWithOnCluster.h>
|
|
|
|
#include <Parsers/ASTAlterQuery.h>
|
2021-11-26 15:49:40 +00:00
|
|
|
#include <Parsers/ASTIdentifier.h>
|
2020-11-03 13:47:26 +00:00
|
|
|
#include <Parsers/queryToString.h>
|
2021-10-31 08:51:20 +00:00
|
|
|
#include <Access/Common/AccessRightsElement.h>
|
2020-11-03 13:47:26 +00:00
|
|
|
#include <Access/ContextAccess.h>
|
|
|
|
#include <Common/Macros.h>
|
|
|
|
#include <Common/ZooKeeper/ZooKeeper.h>
|
|
|
|
#include <DataTypes/DataTypesNumber.h>
|
|
|
|
#include <DataTypes/DataTypeString.h>
|
2021-03-08 20:35:09 +00:00
|
|
|
#include <DataTypes/DataTypeNullable.h>
|
2021-07-23 14:25:35 +00:00
|
|
|
#include <Processors/Sinks/EmptySink.h>
|
2021-10-16 14:03:50 +00:00
|
|
|
#include <QueryPipeline/Pipe.h>
|
2021-01-18 14:09:39 +00:00
|
|
|
#include <filesystem>
|
2022-01-30 19:49:48 +00:00
|
|
|
#include <base/sort.h>
|
2020-11-03 13:47:26 +00:00
|
|
|
|
2021-07-17 21:45:07 +00:00
|
|
|
|
2021-01-18 14:09:39 +00:00
|
|
|
namespace fs = std::filesystem;
|
2020-11-03 13:47:26 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int NOT_IMPLEMENTED;
|
|
|
|
extern const int TIMEOUT_EXCEEDED;
|
|
|
|
extern const int UNFINISHED;
|
|
|
|
extern const int QUERY_IS_PROHIBITED;
|
2020-11-29 11:45:32 +00:00
|
|
|
extern const int LOGICAL_ERROR;
|
2020-11-03 13:47:26 +00:00
|
|
|
}
|
|
|
|
|
2020-11-05 09:52:23 +00:00
|
|
|
bool isSupportedAlterType(int type)
|
2020-11-03 13:47:26 +00:00
|
|
|
{
|
2021-01-26 17:51:25 +00:00
|
|
|
assert(type != ASTAlterCommand::NO_TYPE);
|
2020-11-03 13:47:26 +00:00
|
|
|
static const std::unordered_set<int> unsupported_alter_types{
|
2021-02-03 20:02:37 +00:00
|
|
|
/// It's dangerous, because it may duplicate data if executed on multiple replicas. We can allow it after #18978
|
2020-11-03 13:47:26 +00:00
|
|
|
ASTAlterCommand::ATTACH_PARTITION,
|
2021-01-26 17:51:25 +00:00
|
|
|
/// Usually followed by ATTACH PARTITION
|
2020-11-03 13:47:26 +00:00
|
|
|
ASTAlterCommand::FETCH_PARTITION,
|
2021-01-26 17:51:25 +00:00
|
|
|
/// Logical error
|
2020-11-03 13:47:26 +00:00
|
|
|
ASTAlterCommand::NO_TYPE,
|
|
|
|
};
|
|
|
|
|
2022-04-18 10:18:43 +00:00
|
|
|
return !unsupported_alter_types.contains(type);
|
2020-11-03 13:47:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2022-04-22 12:15:29 +00:00
|
|
|
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, ContextPtr context, const DDLQueryOnClusterParams & params)
|
2020-11-03 13:47:26 +00:00
|
|
|
{
|
2022-01-31 22:27:55 +00:00
|
|
|
if (context->getCurrentTransaction() && context->getSettingsRef().throw_on_unsupported_query_inside_transaction)
|
|
|
|
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "ON CLUSTER queries inside transactions are not supported");
|
|
|
|
|
2020-11-03 13:47:26 +00:00
|
|
|
/// Remove FORMAT <fmt> and INTO OUTFILE <file> if exists
|
|
|
|
ASTPtr query_ptr = query_ptr_->clone();
|
|
|
|
ASTQueryWithOutput::resetOutputASTIfExist(*query_ptr);
|
|
|
|
|
|
|
|
// XXX: serious design flaw since `ASTQueryWithOnCluster` is not inherited from `IAST`!
|
|
|
|
auto * query = dynamic_cast<ASTQueryWithOnCluster *>(query_ptr.get());
|
|
|
|
if (!query)
|
|
|
|
{
|
|
|
|
throw Exception("Distributed execution is not supported for such DDL queries", ErrorCodes::NOT_IMPLEMENTED);
|
|
|
|
}
|
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
if (!context->getSettingsRef().allow_distributed_ddl)
|
2020-11-03 13:47:26 +00:00
|
|
|
throw Exception("Distributed DDL queries are prohibited for the user", ErrorCodes::QUERY_IS_PROHIBITED);
|
|
|
|
|
|
|
|
if (const auto * query_alter = query_ptr->as<ASTAlterQuery>())
|
|
|
|
{
|
2021-01-18 14:09:39 +00:00
|
|
|
for (const auto & command : query_alter->command_list->children)
|
2020-11-03 13:47:26 +00:00
|
|
|
{
|
2021-01-18 14:09:39 +00:00
|
|
|
if (!isSupportedAlterType(command->as<ASTAlterCommand&>().type))
|
2020-11-03 13:47:26 +00:00
|
|
|
throw Exception("Unsupported type of ALTER query", ErrorCodes::NOT_IMPLEMENTED);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-06-18 22:01:08 +00:00
|
|
|
ClusterPtr cluster = params.cluster;
|
|
|
|
if (!cluster)
|
|
|
|
{
|
|
|
|
query->cluster = context->getMacros()->expand(query->cluster);
|
|
|
|
cluster = context->getCluster(query->cluster);
|
|
|
|
}
|
2022-03-30 08:32:49 +00:00
|
|
|
|
|
|
|
/// TODO: support per-cluster grant
|
|
|
|
context->checkAccess(AccessType::CLUSTER);
|
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
DDLWorker & ddl_worker = context->getDDLWorker();
|
2020-11-03 13:47:26 +00:00
|
|
|
|
|
|
|
/// Enumerate hosts which will be used to send query.
|
2022-06-18 22:01:08 +00:00
|
|
|
auto addresses = cluster->filterAddressesByShardOrReplica(params.only_shard_num, params.only_replica_num);
|
|
|
|
if (addresses.empty())
|
2020-11-03 13:47:26 +00:00
|
|
|
throw Exception("No hosts defined to execute distributed DDL query", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
2022-06-18 22:01:08 +00:00
|
|
|
std::vector<HostID> hosts;
|
|
|
|
for (const auto * address : addresses)
|
|
|
|
hosts.emplace_back(*address);
|
|
|
|
|
2020-11-03 13:47:26 +00:00
|
|
|
/// The current database in a distributed query need to be replaced with either
|
|
|
|
/// the local current database or a shard's default database.
|
2022-04-22 12:15:29 +00:00
|
|
|
AccessRightsElements access_to_check = params.access_to_check;
|
2021-06-20 08:24:43 +00:00
|
|
|
bool need_replace_current_database = std::any_of(
|
2022-04-22 12:15:29 +00:00
|
|
|
access_to_check.begin(),
|
|
|
|
access_to_check.end(),
|
2021-06-20 08:24:43 +00:00
|
|
|
[](const AccessRightsElement & elem) { return elem.isEmptyDatabase(); });
|
2020-11-03 13:47:26 +00:00
|
|
|
|
|
|
|
bool use_local_default_database = false;
|
2021-04-10 23:33:54 +00:00
|
|
|
const String & current_database = context->getCurrentDatabase();
|
2020-11-03 13:47:26 +00:00
|
|
|
|
|
|
|
if (need_replace_current_database)
|
|
|
|
{
|
2022-06-18 22:01:08 +00:00
|
|
|
Strings host_default_databases;
|
|
|
|
for (const auto * address : addresses)
|
2020-11-03 13:47:26 +00:00
|
|
|
{
|
2022-06-18 22:01:08 +00:00
|
|
|
if (!address->default_database.empty())
|
|
|
|
host_default_databases.push_back(address->default_database);
|
|
|
|
else
|
|
|
|
use_local_default_database = true;
|
2020-11-03 13:47:26 +00:00
|
|
|
}
|
2022-06-18 22:01:08 +00:00
|
|
|
::sort(host_default_databases.begin(), host_default_databases.end());
|
|
|
|
host_default_databases.erase(std::unique(host_default_databases.begin(), host_default_databases.end()), host_default_databases.end());
|
|
|
|
assert(use_local_default_database || !host_default_databases.empty());
|
2020-11-03 13:47:26 +00:00
|
|
|
|
2022-06-18 22:01:08 +00:00
|
|
|
if (use_local_default_database && !host_default_databases.empty())
|
2020-11-03 13:47:26 +00:00
|
|
|
throw Exception("Mixed local default DB and shard default DB in DDL query", ErrorCodes::NOT_IMPLEMENTED);
|
|
|
|
|
|
|
|
if (use_local_default_database)
|
|
|
|
{
|
2022-04-22 12:15:29 +00:00
|
|
|
access_to_check.replaceEmptyDatabase(current_database);
|
2020-11-03 13:47:26 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2022-04-22 12:15:29 +00:00
|
|
|
for (size_t i = 0; i != access_to_check.size();)
|
2020-11-03 13:47:26 +00:00
|
|
|
{
|
2022-04-22 12:15:29 +00:00
|
|
|
auto & element = access_to_check[i];
|
2020-11-03 13:47:26 +00:00
|
|
|
if (element.isEmptyDatabase())
|
|
|
|
{
|
2022-06-18 22:01:08 +00:00
|
|
|
access_to_check.insert(access_to_check.begin() + i + 1, host_default_databases.size() - 1, element);
|
|
|
|
for (size_t j = 0; j != host_default_databases.size(); ++j)
|
|
|
|
access_to_check[i + j].replaceEmptyDatabase(host_default_databases[j]);
|
|
|
|
i += host_default_databases.size();
|
2020-11-03 13:47:26 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
++i;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-01 19:35:17 +00:00
|
|
|
AddDefaultDatabaseVisitor visitor(context, current_database, !use_local_default_database);
|
2020-11-03 13:47:26 +00:00
|
|
|
visitor.visitDDL(query_ptr);
|
|
|
|
|
|
|
|
/// Check access rights, assume that all servers have the same users config
|
2022-04-22 12:15:29 +00:00
|
|
|
context->checkAccess(access_to_check);
|
2020-11-03 13:47:26 +00:00
|
|
|
|
|
|
|
DDLLogEntry entry;
|
|
|
|
entry.hosts = std::move(hosts);
|
|
|
|
entry.query = queryToString(query_ptr);
|
|
|
|
entry.initiator = ddl_worker.getCommonHostID();
|
2021-03-08 22:57:53 +00:00
|
|
|
entry.setSettingsIfRequired(context);
|
2020-11-03 13:47:26 +00:00
|
|
|
String node_path = ddl_worker.enqueueQuery(entry);
|
|
|
|
|
2021-03-08 20:35:09 +00:00
|
|
|
return getDistributedDDLStatus(node_path, entry, context);
|
|
|
|
}
|
|
|
|
|
2021-07-22 00:38:28 +00:00
|
|
|
|
2022-05-20 19:49:31 +00:00
|
|
|
class DDLQueryStatusSource final : public ISource
|
2021-07-22 00:38:28 +00:00
|
|
|
{
|
|
|
|
public:
|
|
|
|
DDLQueryStatusSource(
|
|
|
|
const String & zk_node_path, const DDLLogEntry & entry, ContextPtr context_, const std::optional<Strings> & hosts_to_wait = {});
|
|
|
|
|
|
|
|
String getName() const override { return "DDLQueryStatus"; }
|
|
|
|
Chunk generate() override;
|
|
|
|
Status prepare() override;
|
|
|
|
|
|
|
|
private:
|
|
|
|
static Strings getChildrenAllowNoNode(const std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & node_path);
|
|
|
|
|
|
|
|
Strings getNewAndUpdate(const Strings & current_list_of_finished_hosts);
|
|
|
|
|
|
|
|
std::pair<String, UInt16> parseHostAndPort(const String & host_id) const;
|
|
|
|
|
|
|
|
String node_path;
|
|
|
|
ContextPtr context;
|
|
|
|
Stopwatch watch;
|
|
|
|
Poco::Logger * log;
|
|
|
|
|
|
|
|
NameSet waiting_hosts; /// hosts from task host list
|
|
|
|
NameSet finished_hosts; /// finished hosts from host list
|
|
|
|
NameSet ignoring_hosts; /// appeared hosts that are not in hosts list
|
|
|
|
Strings current_active_hosts; /// Hosts that were in active state at the last check
|
|
|
|
size_t num_hosts_finished = 0;
|
|
|
|
|
|
|
|
/// Save the first detected error and throw it at the end of execution
|
|
|
|
std::unique_ptr<Exception> first_exception;
|
|
|
|
|
|
|
|
Int64 timeout_seconds = 120;
|
|
|
|
bool by_hostname = true;
|
|
|
|
bool throw_on_timeout = true;
|
|
|
|
bool timeout_exceeded = false;
|
|
|
|
};
|
|
|
|
|
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
BlockIO getDistributedDDLStatus(const String & node_path, const DDLLogEntry & entry, ContextPtr context, const std::optional<Strings> & hosts_to_wait)
|
2021-03-08 20:35:09 +00:00
|
|
|
{
|
2020-11-03 13:47:26 +00:00
|
|
|
BlockIO io;
|
2021-04-10 23:33:54 +00:00
|
|
|
if (context->getSettingsRef().distributed_ddl_task_timeout == 0)
|
2020-11-03 13:47:26 +00:00
|
|
|
return io;
|
|
|
|
|
2021-09-16 17:40:42 +00:00
|
|
|
auto source = std::make_shared<DDLQueryStatusSource>(node_path, entry, context, hosts_to_wait);
|
|
|
|
io.pipeline = QueryPipeline(std::move(source));
|
2021-07-17 21:45:07 +00:00
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
if (context->getSettingsRef().distributed_ddl_output_mode == DistributedDDLOutputMode::NONE)
|
2021-09-21 06:57:55 +00:00
|
|
|
io.pipeline.complete(std::make_shared<EmptySink>(io.pipeline.getHeader()));
|
2021-06-15 20:52:29 +00:00
|
|
|
|
2020-11-03 13:47:26 +00:00
|
|
|
return io;
|
|
|
|
}
|
|
|
|
|
2021-07-19 16:46:58 +00:00
|
|
|
static Block getSampleBlock(ContextPtr context_, bool hosts_to_wait)
|
|
|
|
{
|
|
|
|
auto output_mode = context_->getSettingsRef().distributed_ddl_output_mode;
|
|
|
|
|
|
|
|
auto maybe_make_nullable = [&](const DataTypePtr & type) -> DataTypePtr
|
|
|
|
{
|
|
|
|
if (output_mode == DistributedDDLOutputMode::THROW || output_mode == DistributedDDLOutputMode::NONE)
|
|
|
|
return type;
|
|
|
|
return std::make_shared<DataTypeNullable>(type);
|
|
|
|
};
|
|
|
|
|
|
|
|
Block res = Block{
|
|
|
|
{std::make_shared<DataTypeString>(), "host"},
|
|
|
|
{std::make_shared<DataTypeUInt16>(), "port"},
|
|
|
|
{maybe_make_nullable(std::make_shared<DataTypeInt64>()), "status"},
|
|
|
|
{maybe_make_nullable(std::make_shared<DataTypeString>()), "error"},
|
|
|
|
{std::make_shared<DataTypeUInt64>(), "num_hosts_remaining"},
|
|
|
|
{std::make_shared<DataTypeUInt64>(), "num_hosts_active"},
|
|
|
|
};
|
|
|
|
|
|
|
|
if (hosts_to_wait)
|
|
|
|
res.erase("port");
|
|
|
|
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
2021-07-17 21:45:07 +00:00
|
|
|
DDLQueryStatusSource::DDLQueryStatusSource(
|
|
|
|
const String & zk_node_path, const DDLLogEntry & entry, ContextPtr context_, const std::optional<Strings> & hosts_to_wait)
|
2022-05-20 19:49:31 +00:00
|
|
|
: ISource(getSampleBlock(context_, hosts_to_wait.has_value()))
|
2021-07-17 21:45:07 +00:00
|
|
|
, node_path(zk_node_path)
|
2020-11-03 13:47:26 +00:00
|
|
|
, context(context_)
|
|
|
|
, watch(CLOCK_MONOTONIC_COARSE)
|
2022-05-09 19:13:02 +00:00
|
|
|
, log(&Poco::Logger::get("DDLQueryStatusSource"))
|
2020-11-03 13:47:26 +00:00
|
|
|
{
|
2021-07-17 21:45:07 +00:00
|
|
|
auto output_mode = context->getSettingsRef().distributed_ddl_output_mode;
|
|
|
|
throw_on_timeout = output_mode == DistributedDDLOutputMode::THROW || output_mode == DistributedDDLOutputMode::NONE;
|
|
|
|
|
|
|
|
if (hosts_to_wait)
|
|
|
|
{
|
|
|
|
waiting_hosts = NameSet(hosts_to_wait->begin(), hosts_to_wait->end());
|
|
|
|
by_hostname = false;
|
|
|
|
}
|
2021-03-08 20:35:09 +00:00
|
|
|
else
|
2021-07-17 21:45:07 +00:00
|
|
|
{
|
|
|
|
for (const HostID & host : entry.hosts)
|
|
|
|
waiting_hosts.emplace(host.toString());
|
|
|
|
}
|
|
|
|
|
|
|
|
addTotalRowsApprox(waiting_hosts.size());
|
|
|
|
timeout_seconds = context->getSettingsRef().distributed_ddl_task_timeout;
|
|
|
|
}
|
|
|
|
|
|
|
|
std::pair<String, UInt16> DDLQueryStatusSource::parseHostAndPort(const String & host_id) const
|
2021-03-08 20:35:09 +00:00
|
|
|
{
|
|
|
|
String host = host_id;
|
|
|
|
UInt16 port = 0;
|
|
|
|
if (by_hostname)
|
|
|
|
{
|
|
|
|
auto host_and_port = Cluster::Address::fromString(host_id);
|
|
|
|
host = host_and_port.first;
|
|
|
|
port = host_and_port.second;
|
|
|
|
}
|
|
|
|
return {host, port};
|
2020-11-03 13:47:26 +00:00
|
|
|
}
|
|
|
|
|
2021-07-17 21:45:07 +00:00
|
|
|
Chunk DDLQueryStatusSource::generate()
|
2020-11-03 13:47:26 +00:00
|
|
|
{
|
2021-03-08 20:35:09 +00:00
|
|
|
bool all_hosts_finished = num_hosts_finished >= waiting_hosts.size();
|
2021-07-17 21:45:07 +00:00
|
|
|
|
2021-03-08 20:35:09 +00:00
|
|
|
/// Seems like num_hosts_finished cannot be strictly greater than waiting_hosts.size()
|
|
|
|
assert(num_hosts_finished <= waiting_hosts.size());
|
2020-11-03 13:47:26 +00:00
|
|
|
|
2021-07-17 21:45:07 +00:00
|
|
|
if (all_hosts_finished || timeout_exceeded)
|
|
|
|
return {};
|
2020-11-03 13:47:26 +00:00
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
auto zookeeper = context->getZooKeeper();
|
2020-11-03 13:47:26 +00:00
|
|
|
size_t try_number = 0;
|
|
|
|
|
2021-07-17 21:45:07 +00:00
|
|
|
while (true)
|
2020-11-03 13:47:26 +00:00
|
|
|
{
|
|
|
|
if (isCancelled())
|
2021-07-17 21:45:07 +00:00
|
|
|
return {};
|
2020-11-03 13:47:26 +00:00
|
|
|
|
2021-03-09 17:05:24 +00:00
|
|
|
if (timeout_seconds >= 0 && watch.elapsedSeconds() > timeout_seconds)
|
2020-11-03 13:47:26 +00:00
|
|
|
{
|
2021-07-20 22:27:27 +00:00
|
|
|
timeout_exceeded = true;
|
|
|
|
|
2020-11-03 13:47:26 +00:00
|
|
|
size_t num_unfinished_hosts = waiting_hosts.size() - num_hosts_finished;
|
|
|
|
size_t num_active_hosts = current_active_hosts.size();
|
|
|
|
|
2021-03-08 20:35:09 +00:00
|
|
|
constexpr const char * msg_format = "Watching task {} is executing longer than distributed_ddl_task_timeout (={}) seconds. "
|
|
|
|
"There are {} unfinished hosts ({} of them are currently active), "
|
|
|
|
"they are going to execute the query in background";
|
|
|
|
if (throw_on_timeout)
|
2021-07-20 22:27:27 +00:00
|
|
|
{
|
|
|
|
if (!first_exception)
|
2022-03-28 09:48:17 +00:00
|
|
|
first_exception = std::make_unique<Exception>(
|
|
|
|
fmt::format(msg_format, node_path, timeout_seconds, num_unfinished_hosts, num_active_hosts),
|
|
|
|
ErrorCodes::TIMEOUT_EXCEEDED);
|
2021-07-20 22:27:27 +00:00
|
|
|
return {};
|
|
|
|
}
|
2021-03-08 20:35:09 +00:00
|
|
|
|
2022-03-28 09:48:17 +00:00
|
|
|
LOG_INFO(log, msg_format, node_path, timeout_seconds, num_unfinished_hosts, num_active_hosts);
|
2020-11-03 13:47:26 +00:00
|
|
|
|
2021-03-08 20:35:09 +00:00
|
|
|
NameSet unfinished_hosts = waiting_hosts;
|
|
|
|
for (const auto & host_id : finished_hosts)
|
|
|
|
unfinished_hosts.erase(host_id);
|
|
|
|
|
|
|
|
/// Query is not finished on the rest hosts, so fill the corresponding rows with NULLs.
|
2021-07-17 21:45:07 +00:00
|
|
|
MutableColumns columns = output.getHeader().cloneEmptyColumns();
|
2021-03-08 20:35:09 +00:00
|
|
|
for (const String & host_id : unfinished_hosts)
|
|
|
|
{
|
|
|
|
auto [host, port] = parseHostAndPort(host_id);
|
2021-03-09 21:41:04 +00:00
|
|
|
size_t num = 0;
|
|
|
|
columns[num++]->insert(host);
|
|
|
|
if (by_hostname)
|
|
|
|
columns[num++]->insert(port);
|
|
|
|
columns[num++]->insert(Field{});
|
|
|
|
columns[num++]->insert(Field{});
|
|
|
|
columns[num++]->insert(num_unfinished_hosts);
|
|
|
|
columns[num++]->insert(num_active_hosts);
|
2021-03-08 20:35:09 +00:00
|
|
|
}
|
2021-07-17 21:45:07 +00:00
|
|
|
return Chunk(std::move(columns), unfinished_hosts.size());
|
2020-11-03 13:47:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if (num_hosts_finished != 0 || try_number != 0)
|
|
|
|
{
|
|
|
|
sleepForMilliseconds(std::min<size_t>(1000, 50 * (try_number + 1)));
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!zookeeper->exists(node_path))
|
|
|
|
{
|
2021-07-20 22:27:27 +00:00
|
|
|
/// Paradoxically, this exception will be throw even in case of "never_throw" mode.
|
|
|
|
|
|
|
|
if (!first_exception)
|
2022-03-28 09:48:17 +00:00
|
|
|
first_exception = std::make_unique<Exception>(
|
|
|
|
fmt::format(
|
|
|
|
"Cannot provide query execution status. The query's node {} has been deleted by the cleaner"
|
|
|
|
" since it was finished (or its lifetime is expired)",
|
|
|
|
node_path),
|
|
|
|
ErrorCodes::UNFINISHED);
|
2021-07-20 22:27:27 +00:00
|
|
|
return {};
|
2020-11-03 13:47:26 +00:00
|
|
|
}
|
|
|
|
|
2021-01-18 14:09:39 +00:00
|
|
|
Strings new_hosts = getNewAndUpdate(getChildrenAllowNoNode(zookeeper, fs::path(node_path) / "finished"));
|
2020-11-03 13:47:26 +00:00
|
|
|
++try_number;
|
|
|
|
if (new_hosts.empty())
|
|
|
|
continue;
|
|
|
|
|
2021-01-18 14:09:39 +00:00
|
|
|
current_active_hosts = getChildrenAllowNoNode(zookeeper, fs::path(node_path) / "active");
|
2020-11-03 13:47:26 +00:00
|
|
|
|
2021-07-17 21:45:07 +00:00
|
|
|
MutableColumns columns = output.getHeader().cloneEmptyColumns();
|
2020-11-03 13:47:26 +00:00
|
|
|
for (const String & host_id : new_hosts)
|
|
|
|
{
|
|
|
|
ExecutionStatus status(-1, "Cannot obtain error message");
|
|
|
|
{
|
|
|
|
String status_data;
|
2021-01-18 14:09:39 +00:00
|
|
|
if (zookeeper->tryGet(fs::path(node_path) / "finished" / host_id, status_data))
|
2020-11-03 13:47:26 +00:00
|
|
|
status.tryDeserializeText(status_data);
|
|
|
|
}
|
|
|
|
|
2021-03-08 20:35:09 +00:00
|
|
|
auto [host, port] = parseHostAndPort(host_id);
|
2020-11-03 13:47:26 +00:00
|
|
|
|
2021-07-20 22:27:27 +00:00
|
|
|
if (status.code != 0 && !first_exception
|
|
|
|
&& context->getSettingsRef().distributed_ddl_output_mode != DistributedDDLOutputMode::NEVER_THROW)
|
|
|
|
{
|
2022-03-28 09:48:17 +00:00
|
|
|
first_exception = std::make_unique<Exception>(
|
|
|
|
fmt::format("There was an error on [{}:{}]: {}", host, port, status.message), status.code);
|
2021-07-20 22:27:27 +00:00
|
|
|
}
|
2020-11-03 13:47:26 +00:00
|
|
|
|
|
|
|
++num_hosts_finished;
|
|
|
|
|
2021-03-09 21:41:04 +00:00
|
|
|
size_t num = 0;
|
|
|
|
columns[num++]->insert(host);
|
|
|
|
if (by_hostname)
|
|
|
|
columns[num++]->insert(port);
|
|
|
|
columns[num++]->insert(status.code);
|
|
|
|
columns[num++]->insert(status.message);
|
|
|
|
columns[num++]->insert(waiting_hosts.size() - num_hosts_finished);
|
|
|
|
columns[num++]->insert(current_active_hosts.size());
|
2020-11-03 13:47:26 +00:00
|
|
|
}
|
2021-07-17 21:45:07 +00:00
|
|
|
|
|
|
|
return Chunk(std::move(columns), new_hosts.size());
|
2020-11-03 13:47:26 +00:00
|
|
|
}
|
2021-07-17 21:45:07 +00:00
|
|
|
}
|
2020-11-03 13:47:26 +00:00
|
|
|
|
2021-07-19 17:23:16 +00:00
|
|
|
IProcessor::Status DDLQueryStatusSource::prepare()
|
2021-07-17 21:45:07 +00:00
|
|
|
{
|
2021-07-19 21:58:48 +00:00
|
|
|
/// This method is overloaded to throw exception after all data is read.
|
|
|
|
/// Exception is pushed into pipe (instead of simply being thrown) to ensure the order of data processing and exception.
|
|
|
|
|
2021-07-17 21:45:07 +00:00
|
|
|
if (finished)
|
|
|
|
{
|
2021-07-20 22:27:27 +00:00
|
|
|
if (first_exception)
|
2021-07-19 18:46:30 +00:00
|
|
|
{
|
|
|
|
if (!output.canPush())
|
|
|
|
return Status::PortFull;
|
|
|
|
|
2021-07-19 16:46:58 +00:00
|
|
|
output.pushException(std::make_exception_ptr(*first_exception));
|
2021-07-19 18:46:30 +00:00
|
|
|
}
|
2021-07-19 17:23:16 +00:00
|
|
|
|
|
|
|
output.finish();
|
|
|
|
return Status::Finished;
|
2021-07-17 21:45:07 +00:00
|
|
|
}
|
2021-07-19 17:23:16 +00:00
|
|
|
else
|
2022-05-20 19:49:31 +00:00
|
|
|
return ISource::prepare();
|
2020-11-03 13:47:26 +00:00
|
|
|
}
|
|
|
|
|
2021-07-17 21:45:07 +00:00
|
|
|
Strings DDLQueryStatusSource::getChildrenAllowNoNode(const std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & node_path)
|
2020-11-03 13:47:26 +00:00
|
|
|
{
|
|
|
|
Strings res;
|
|
|
|
Coordination::Error code = zookeeper->tryGetChildren(node_path, res);
|
|
|
|
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
|
|
|
|
throw Coordination::Exception(code, node_path);
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
2021-07-17 21:45:07 +00:00
|
|
|
Strings DDLQueryStatusSource::getNewAndUpdate(const Strings & current_list_of_finished_hosts)
|
2020-11-03 13:47:26 +00:00
|
|
|
{
|
|
|
|
Strings diff;
|
|
|
|
for (const String & host : current_list_of_finished_hosts)
|
|
|
|
{
|
2022-04-18 10:18:43 +00:00
|
|
|
if (!waiting_hosts.contains(host))
|
2020-11-03 13:47:26 +00:00
|
|
|
{
|
2022-04-18 10:18:43 +00:00
|
|
|
if (!ignoring_hosts.contains(host))
|
2020-11-03 13:47:26 +00:00
|
|
|
{
|
|
|
|
ignoring_hosts.emplace(host);
|
2021-07-17 21:45:07 +00:00
|
|
|
LOG_INFO(log, "Unexpected host {} appeared in task {}", host, node_path);
|
2020-11-03 13:47:26 +00:00
|
|
|
}
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2022-04-18 10:18:43 +00:00
|
|
|
if (!finished_hosts.contains(host))
|
2020-11-03 13:47:26 +00:00
|
|
|
{
|
|
|
|
diff.emplace_back(host);
|
|
|
|
finished_hosts.emplace(host);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return diff;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|