mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-02 12:32:04 +00:00
split DDLWorker.cpp
This commit is contained in:
parent
d8ae9fcdb4
commit
cbcdee0cf9
81
src/Interpreters/DDLTask.cpp
Normal file
81
src/Interpreters/DDLTask.cpp
Normal file
@ -0,0 +1,81 @@
|
||||
#include <Interpreters/DDLTask.h>
|
||||
#include <Common/DNSResolver.h>
|
||||
#include <Common/isLocalAddress.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <Poco/Net/NetException.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int UNKNOWN_FORMAT_VERSION;
|
||||
}
|
||||
|
||||
HostID HostID::fromString(const String & host_port_str)
|
||||
{
|
||||
HostID res;
|
||||
std::tie(res.host_name, res.port) = Cluster::Address::fromString(host_port_str);
|
||||
return res;
|
||||
}
|
||||
|
||||
bool HostID::isLocalAddress(UInt16 clickhouse_port) const
|
||||
{
|
||||
try
|
||||
{
|
||||
return DB::isLocalAddress(DNSResolver::instance().resolveAddress(host_name, port), clickhouse_port);
|
||||
}
|
||||
catch (const Poco::Net::NetException &)
|
||||
{
|
||||
/// Avoid "Host not found" exceptions
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
String DDLLogEntry::toString() const
|
||||
{
|
||||
WriteBufferFromOwnString wb;
|
||||
|
||||
Strings host_id_strings(hosts.size());
|
||||
std::transform(hosts.begin(), hosts.end(), host_id_strings.begin(), HostID::applyToString);
|
||||
|
||||
auto version = CURRENT_VERSION;
|
||||
wb << "version: " << version << "\n";
|
||||
wb << "query: " << escape << query << "\n";
|
||||
wb << "hosts: " << host_id_strings << "\n";
|
||||
wb << "initiator: " << initiator << "\n";
|
||||
|
||||
return wb.str();
|
||||
}
|
||||
|
||||
void DDLLogEntry::parse(const String & data)
|
||||
{
|
||||
ReadBufferFromString rb(data);
|
||||
|
||||
int version;
|
||||
rb >> "version: " >> version >> "\n";
|
||||
|
||||
if (version != CURRENT_VERSION)
|
||||
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unknown DDLLogEntry format version: {}", version);
|
||||
|
||||
Strings host_id_strings;
|
||||
rb >> "query: " >> escape >> query >> "\n";
|
||||
rb >> "hosts: " >> host_id_strings >> "\n";
|
||||
|
||||
if (!rb.eof())
|
||||
rb >> "initiator: " >> initiator >> "\n";
|
||||
else
|
||||
initiator.clear();
|
||||
|
||||
assertEOF(rb);
|
||||
|
||||
hosts.resize(host_id_strings.size());
|
||||
std::transform(host_id_strings.begin(), host_id_strings.end(), hosts.begin(), HostID::fromString);
|
||||
}
|
||||
|
||||
|
||||
}
|
88
src/Interpreters/DDLTask.h
Normal file
88
src/Interpreters/DDLTask.h
Normal file
@ -0,0 +1,88 @@
|
||||
#pragma once
|
||||
#include <Core/Types.h>
|
||||
#include <Interpreters/Cluster.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ASTQueryWithOnCluster;
|
||||
|
||||
struct HostID
|
||||
{
|
||||
String host_name;
|
||||
UInt16 port;
|
||||
|
||||
HostID() = default;
|
||||
|
||||
explicit HostID(const Cluster::Address & address)
|
||||
: host_name(address.host_name), port(address.port) {}
|
||||
|
||||
static HostID fromString(const String & host_port_str);
|
||||
|
||||
String toString() const
|
||||
{
|
||||
return Cluster::Address::toString(host_name, port);
|
||||
}
|
||||
|
||||
String readableString() const
|
||||
{
|
||||
return host_name + ":" + DB::toString(port);
|
||||
}
|
||||
|
||||
bool isLocalAddress(UInt16 clickhouse_port) const;
|
||||
|
||||
static String applyToString(const HostID & host_id)
|
||||
{
|
||||
return host_id.toString();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
struct DDLLogEntry
|
||||
{
|
||||
String query;
|
||||
std::vector<HostID> hosts;
|
||||
String initiator; // optional
|
||||
|
||||
static constexpr int CURRENT_VERSION = 1;
|
||||
|
||||
String toString() const;
|
||||
|
||||
void parse(const String & data);
|
||||
};
|
||||
|
||||
|
||||
struct DDLTask
|
||||
{
|
||||
/// Stages of task lifetime correspond ordering of these data fields:
|
||||
|
||||
/// Stage 1: parse entry
|
||||
String entry_name;
|
||||
String entry_path;
|
||||
DDLLogEntry entry;
|
||||
|
||||
/// Stage 2: resolve host_id and check that
|
||||
HostID host_id;
|
||||
String host_id_str;
|
||||
|
||||
/// Stage 3.1: parse query
|
||||
ASTPtr query;
|
||||
ASTQueryWithOnCluster * query_on_cluster = nullptr;
|
||||
|
||||
/// Stage 3.2: check cluster and find the host in cluster
|
||||
String cluster_name;
|
||||
ClusterPtr cluster;
|
||||
Cluster::Address address_in_cluster;
|
||||
size_t host_shard_num;
|
||||
size_t host_replica_num;
|
||||
|
||||
/// Stage 3.3: execute query
|
||||
ExecutionStatus execution_status;
|
||||
bool was_executed = false;
|
||||
|
||||
/// Stage 4: commit results to ZooKeeper
|
||||
};
|
||||
|
||||
|
||||
}
|
@ -1,4 +1,5 @@
|
||||
#include <Interpreters/DDLWorker.h>
|
||||
#include <Interpreters/DDLTask.h>
|
||||
#include <Parsers/ASTAlterQuery.h>
|
||||
#include <Parsers/ASTDropQuery.h>
|
||||
#include <Parsers/ASTOptimizeQuery.h>
|
||||
@ -9,37 +10,21 @@
|
||||
#include <Parsers/queryToString.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Storages/StorageDistributed.h>
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
#include <Interpreters/executeQuery.h>
|
||||
#include <Interpreters/Cluster.h>
|
||||
#include <Interpreters/AddDefaultDatabaseVisitor.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Access/AccessRightsElement.h>
|
||||
#include <Access/ContextAccess.h>
|
||||
#include <Common/DNSResolver.h>
|
||||
#include <Common/Macros.h>
|
||||
#include <Common/setThreadName.h>
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <Common/randomSeed.h>
|
||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||
#include <Common/ZooKeeper/KeeperException.h>
|
||||
#include <Common/isLocalAddress.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <Columns/ColumnArray.h>
|
||||
#include <Storages/StorageReplicatedMergeTree.h>
|
||||
#include <Poco/Timestamp.h>
|
||||
#include <Poco/Net/NetException.h>
|
||||
#include <common/sleep.h>
|
||||
#include <common/getFQDNOrHostName.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <random>
|
||||
#include <pcg_random.hpp>
|
||||
|
||||
@ -51,7 +36,6 @@ namespace ErrorCodes
|
||||
{
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int UNKNOWN_FORMAT_VERSION;
|
||||
extern const int INCONSISTENT_CLUSTER_DEFINITION;
|
||||
extern const int TIMEOUT_EXCEEDED;
|
||||
extern const int UNKNOWN_TYPE_OF_QUERY;
|
||||
@ -60,141 +44,6 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
struct HostID
|
||||
{
|
||||
String host_name;
|
||||
UInt16 port;
|
||||
|
||||
HostID() = default;
|
||||
|
||||
explicit HostID(const Cluster::Address & address)
|
||||
: host_name(address.host_name), port(address.port) {}
|
||||
|
||||
static HostID fromString(const String & host_port_str)
|
||||
{
|
||||
HostID res;
|
||||
std::tie(res.host_name, res.port) = Cluster::Address::fromString(host_port_str);
|
||||
return res;
|
||||
}
|
||||
|
||||
String toString() const
|
||||
{
|
||||
return Cluster::Address::toString(host_name, port);
|
||||
}
|
||||
|
||||
String readableString() const
|
||||
{
|
||||
return host_name + ":" + DB::toString(port);
|
||||
}
|
||||
|
||||
bool isLocalAddress(UInt16 clickhouse_port) const
|
||||
{
|
||||
try
|
||||
{
|
||||
return DB::isLocalAddress(DNSResolver::instance().resolveAddress(host_name, port), clickhouse_port);
|
||||
}
|
||||
catch (const Poco::Net::NetException &)
|
||||
{
|
||||
/// Avoid "Host not found" exceptions
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
static String applyToString(const HostID & host_id)
|
||||
{
|
||||
return host_id.toString();
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
|
||||
struct DDLLogEntry
|
||||
{
|
||||
String query;
|
||||
std::vector<HostID> hosts;
|
||||
String initiator; // optional
|
||||
|
||||
static constexpr int CURRENT_VERSION = 1;
|
||||
|
||||
String toString()
|
||||
{
|
||||
WriteBufferFromOwnString wb;
|
||||
|
||||
Strings host_id_strings(hosts.size());
|
||||
std::transform(hosts.begin(), hosts.end(), host_id_strings.begin(), HostID::applyToString);
|
||||
|
||||
auto version = CURRENT_VERSION;
|
||||
wb << "version: " << version << "\n";
|
||||
wb << "query: " << escape << query << "\n";
|
||||
wb << "hosts: " << host_id_strings << "\n";
|
||||
wb << "initiator: " << initiator << "\n";
|
||||
|
||||
return wb.str();
|
||||
}
|
||||
|
||||
void parse(const String & data)
|
||||
{
|
||||
ReadBufferFromString rb(data);
|
||||
|
||||
int version;
|
||||
rb >> "version: " >> version >> "\n";
|
||||
|
||||
if (version != CURRENT_VERSION)
|
||||
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unknown DDLLogEntry format version: {}", version);
|
||||
|
||||
Strings host_id_strings;
|
||||
rb >> "query: " >> escape >> query >> "\n";
|
||||
rb >> "hosts: " >> host_id_strings >> "\n";
|
||||
|
||||
if (!rb.eof())
|
||||
rb >> "initiator: " >> initiator >> "\n";
|
||||
else
|
||||
initiator.clear();
|
||||
|
||||
assertEOF(rb);
|
||||
|
||||
hosts.resize(host_id_strings.size());
|
||||
std::transform(host_id_strings.begin(), host_id_strings.end(), hosts.begin(), HostID::fromString);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
struct DDLTask
|
||||
{
|
||||
/// Stages of task lifetime correspond ordering of these data fields:
|
||||
|
||||
/// Stage 1: parse entry
|
||||
String entry_name;
|
||||
String entry_path;
|
||||
DDLLogEntry entry;
|
||||
|
||||
/// Stage 2: resolve host_id and check that
|
||||
HostID host_id;
|
||||
String host_id_str;
|
||||
|
||||
/// Stage 3.1: parse query
|
||||
ASTPtr query;
|
||||
ASTQueryWithOnCluster * query_on_cluster = nullptr;
|
||||
|
||||
/// Stage 3.2: check cluster and find the host in cluster
|
||||
String cluster_name;
|
||||
ClusterPtr cluster;
|
||||
Cluster::Address address_in_cluster;
|
||||
size_t host_shard_num;
|
||||
size_t host_replica_num;
|
||||
|
||||
/// Stage 3.3: execute query
|
||||
ExecutionStatus execution_status;
|
||||
bool was_executed = false;
|
||||
|
||||
/// Stage 4: commit results to ZooKeeper
|
||||
};
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
@ -293,21 +142,6 @@ std::unique_ptr<ZooKeeperLock> createSimpleZooKeeperLock(
|
||||
}
|
||||
|
||||
|
||||
static bool isSupportedAlterType(int type)
|
||||
{
|
||||
static const std::unordered_set<int> unsupported_alter_types{
|
||||
ASTAlterCommand::ATTACH_PARTITION,
|
||||
ASTAlterCommand::REPLACE_PARTITION,
|
||||
ASTAlterCommand::FETCH_PARTITION,
|
||||
ASTAlterCommand::FREEZE_PARTITION,
|
||||
ASTAlterCommand::FREEZE_ALL,
|
||||
ASTAlterCommand::NO_TYPE,
|
||||
};
|
||||
|
||||
return unsupported_alter_types.count(type) == 0;
|
||||
}
|
||||
|
||||
|
||||
DDLWorker::DDLWorker(int pool_size_, const std::string & zk_root_dir, Context & context_, const Poco::Util::AbstractConfiguration * config, const String & prefix)
|
||||
: context(context_)
|
||||
, log(&Poco::Logger::get("DDLWorker"))
|
||||
@ -1187,313 +1021,4 @@ void DDLWorker::runCleanupThread()
|
||||
}
|
||||
|
||||
|
||||
class DDLQueryStatusInputStream : public IBlockInputStream
|
||||
{
|
||||
public:
|
||||
|
||||
DDLQueryStatusInputStream(const String & zk_node_path, const DDLLogEntry & entry, const Context & context_)
|
||||
: node_path(zk_node_path), context(context_), watch(CLOCK_MONOTONIC_COARSE), log(&Poco::Logger::get("DDLQueryStatusInputStream"))
|
||||
{
|
||||
sample = Block{
|
||||
{std::make_shared<DataTypeString>(), "host"},
|
||||
{std::make_shared<DataTypeUInt16>(), "port"},
|
||||
{std::make_shared<DataTypeInt64>(), "status"},
|
||||
{std::make_shared<DataTypeString>(), "error"},
|
||||
{std::make_shared<DataTypeUInt64>(), "num_hosts_remaining"},
|
||||
{std::make_shared<DataTypeUInt64>(), "num_hosts_active"},
|
||||
};
|
||||
|
||||
for (const HostID & host: entry.hosts)
|
||||
waiting_hosts.emplace(host.toString());
|
||||
|
||||
addTotalRowsApprox(entry.hosts.size());
|
||||
|
||||
timeout_seconds = context.getSettingsRef().distributed_ddl_task_timeout;
|
||||
}
|
||||
|
||||
String getName() const override
|
||||
{
|
||||
return "DDLQueryStatusInputStream";
|
||||
}
|
||||
|
||||
Block getHeader() const override { return sample; }
|
||||
|
||||
Block readImpl() override
|
||||
{
|
||||
Block res;
|
||||
if (num_hosts_finished >= waiting_hosts.size())
|
||||
{
|
||||
if (first_exception)
|
||||
throw Exception(*first_exception);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
auto zookeeper = context.getZooKeeper();
|
||||
size_t try_number = 0;
|
||||
|
||||
while (res.rows() == 0)
|
||||
{
|
||||
if (isCancelled())
|
||||
{
|
||||
if (first_exception)
|
||||
throw Exception(*first_exception);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
if (timeout_seconds >= 0 && watch.elapsedSeconds() > timeout_seconds)
|
||||
{
|
||||
size_t num_unfinished_hosts = waiting_hosts.size() - num_hosts_finished;
|
||||
size_t num_active_hosts = current_active_hosts.size();
|
||||
|
||||
|
||||
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED,
|
||||
"Watching task {} is executing longer than distributed_ddl_task_timeout (={}) seconds. "
|
||||
"There are {} unfinished hosts ({} of them are currently active), they are going to execute the query in background",
|
||||
node_path, timeout_seconds, num_unfinished_hosts, num_active_hosts);
|
||||
}
|
||||
|
||||
if (num_hosts_finished != 0 || try_number != 0)
|
||||
{
|
||||
sleepForMilliseconds(std::min<size_t>(1000, 50 * (try_number + 1)));
|
||||
}
|
||||
|
||||
/// TODO: add shared lock
|
||||
if (!zookeeper->exists(node_path))
|
||||
{
|
||||
throw Exception(ErrorCodes::UNFINISHED,
|
||||
"Cannot provide query execution status. The query's node {} has been deleted by the cleaner since it was finished (or its lifetime is expired)",
|
||||
node_path);
|
||||
}
|
||||
|
||||
Strings new_hosts = getNewAndUpdate(getChildrenAllowNoNode(zookeeper, node_path + "/finished"));
|
||||
++try_number;
|
||||
if (new_hosts.empty())
|
||||
continue;
|
||||
|
||||
current_active_hosts = getChildrenAllowNoNode(zookeeper, node_path + "/active");
|
||||
|
||||
MutableColumns columns = sample.cloneEmptyColumns();
|
||||
for (const String & host_id : new_hosts)
|
||||
{
|
||||
ExecutionStatus status(-1, "Cannot obtain error message");
|
||||
{
|
||||
String status_data;
|
||||
if (zookeeper->tryGet(node_path + "/finished/" + host_id, status_data))
|
||||
status.tryDeserializeText(status_data);
|
||||
}
|
||||
|
||||
auto [host, port] = Cluster::Address::fromString(host_id);
|
||||
|
||||
if (status.code != 0 && first_exception == nullptr)
|
||||
first_exception = std::make_unique<Exception>(status.code, "There was an error on [{}:{}]: {}", host, port, status.message);
|
||||
|
||||
++num_hosts_finished;
|
||||
|
||||
columns[0]->insert(host);
|
||||
columns[1]->insert(port);
|
||||
columns[2]->insert(status.code);
|
||||
columns[3]->insert(status.message);
|
||||
columns[4]->insert(waiting_hosts.size() - num_hosts_finished);
|
||||
columns[5]->insert(current_active_hosts.size());
|
||||
}
|
||||
res = sample.cloneWithColumns(std::move(columns));
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
Block getSampleBlock() const
|
||||
{
|
||||
return sample.cloneEmpty();
|
||||
}
|
||||
|
||||
~DDLQueryStatusInputStream() override = default;
|
||||
|
||||
private:
|
||||
|
||||
static Strings getChildrenAllowNoNode(const std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & node_path)
|
||||
{
|
||||
Strings res;
|
||||
Coordination::Error code = zookeeper->tryGetChildren(node_path, res);
|
||||
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
|
||||
throw Coordination::Exception(code, node_path);
|
||||
return res;
|
||||
}
|
||||
|
||||
Strings getNewAndUpdate(const Strings & current_list_of_finished_hosts)
|
||||
{
|
||||
Strings diff;
|
||||
for (const String & host : current_list_of_finished_hosts)
|
||||
{
|
||||
if (!waiting_hosts.count(host))
|
||||
{
|
||||
if (!ignoring_hosts.count(host))
|
||||
{
|
||||
ignoring_hosts.emplace(host);
|
||||
LOG_INFO(log, "Unexpected host {} appeared in task {}", host, node_path);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!finished_hosts.count(host))
|
||||
{
|
||||
diff.emplace_back(host);
|
||||
finished_hosts.emplace(host);
|
||||
}
|
||||
}
|
||||
|
||||
return diff;
|
||||
}
|
||||
|
||||
String node_path;
|
||||
const Context & context;
|
||||
Stopwatch watch;
|
||||
Poco::Logger * log;
|
||||
|
||||
Block sample;
|
||||
|
||||
NameSet waiting_hosts; /// hosts from task host list
|
||||
NameSet finished_hosts; /// finished hosts from host list
|
||||
NameSet ignoring_hosts; /// appeared hosts that are not in hosts list
|
||||
Strings current_active_hosts; /// Hosts that were in active state at the last check
|
||||
size_t num_hosts_finished = 0;
|
||||
|
||||
/// Save the first detected error and throw it at the end of execution
|
||||
std::unique_ptr<Exception> first_exception;
|
||||
|
||||
Int64 timeout_seconds = 120;
|
||||
};
|
||||
|
||||
|
||||
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & context, AccessRightsElements && query_requires_access, bool query_requires_grant_option)
|
||||
{
|
||||
/// Remove FORMAT <fmt> and INTO OUTFILE <file> if exists
|
||||
ASTPtr query_ptr = query_ptr_->clone();
|
||||
ASTQueryWithOutput::resetOutputASTIfExist(*query_ptr);
|
||||
|
||||
// XXX: serious design flaw since `ASTQueryWithOnCluster` is not inherited from `IAST`!
|
||||
auto * query = dynamic_cast<ASTQueryWithOnCluster *>(query_ptr.get());
|
||||
if (!query)
|
||||
{
|
||||
throw Exception("Distributed execution is not supported for such DDL queries", ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
if (!context.getSettingsRef().allow_distributed_ddl)
|
||||
throw Exception("Distributed DDL queries are prohibited for the user", ErrorCodes::QUERY_IS_PROHIBITED);
|
||||
|
||||
if (const auto * query_alter = query_ptr->as<ASTAlterQuery>())
|
||||
{
|
||||
for (const auto & command : query_alter->command_list->commands)
|
||||
{
|
||||
if (!isSupportedAlterType(command->type))
|
||||
throw Exception("Unsupported type of ALTER query", ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
}
|
||||
|
||||
query->cluster = context.getMacros()->expand(query->cluster);
|
||||
ClusterPtr cluster = context.getCluster(query->cluster);
|
||||
DDLWorker & ddl_worker = context.getDDLWorker();
|
||||
|
||||
/// Enumerate hosts which will be used to send query.
|
||||
Cluster::AddressesWithFailover shards = cluster->getShardsAddresses();
|
||||
std::vector<HostID> hosts;
|
||||
for (const auto & shard : shards)
|
||||
{
|
||||
for (const auto & addr : shard)
|
||||
hosts.emplace_back(addr);
|
||||
}
|
||||
|
||||
if (hosts.empty())
|
||||
throw Exception("No hosts defined to execute distributed DDL query", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
/// The current database in a distributed query need to be replaced with either
|
||||
/// the local current database or a shard's default database.
|
||||
bool need_replace_current_database
|
||||
= (std::find_if(
|
||||
query_requires_access.begin(),
|
||||
query_requires_access.end(),
|
||||
[](const AccessRightsElement & elem) { return elem.isEmptyDatabase(); })
|
||||
!= query_requires_access.end());
|
||||
|
||||
bool use_local_default_database = false;
|
||||
const String & current_database = context.getCurrentDatabase();
|
||||
|
||||
if (need_replace_current_database)
|
||||
{
|
||||
Strings shard_default_databases;
|
||||
for (const auto & shard : shards)
|
||||
{
|
||||
for (const auto & addr : shard)
|
||||
{
|
||||
if (!addr.default_database.empty())
|
||||
shard_default_databases.push_back(addr.default_database);
|
||||
else
|
||||
use_local_default_database = true;
|
||||
}
|
||||
}
|
||||
std::sort(shard_default_databases.begin(), shard_default_databases.end());
|
||||
shard_default_databases.erase(std::unique(shard_default_databases.begin(), shard_default_databases.end()), shard_default_databases.end());
|
||||
assert(use_local_default_database || !shard_default_databases.empty());
|
||||
|
||||
if (use_local_default_database && !shard_default_databases.empty())
|
||||
throw Exception("Mixed local default DB and shard default DB in DDL query", ErrorCodes::NOT_IMPLEMENTED);
|
||||
|
||||
if (use_local_default_database)
|
||||
{
|
||||
query_requires_access.replaceEmptyDatabase(current_database);
|
||||
}
|
||||
else
|
||||
{
|
||||
for (size_t i = 0; i != query_requires_access.size();)
|
||||
{
|
||||
auto & element = query_requires_access[i];
|
||||
if (element.isEmptyDatabase())
|
||||
{
|
||||
query_requires_access.insert(query_requires_access.begin() + i + 1, shard_default_databases.size() - 1, element);
|
||||
for (size_t j = 0; j != shard_default_databases.size(); ++j)
|
||||
query_requires_access[i + j].replaceEmptyDatabase(shard_default_databases[j]);
|
||||
i += shard_default_databases.size();
|
||||
}
|
||||
else
|
||||
++i;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
AddDefaultDatabaseVisitor visitor(current_database, !use_local_default_database);
|
||||
visitor.visitDDL(query_ptr);
|
||||
|
||||
/// Check access rights, assume that all servers have the same users config
|
||||
if (query_requires_grant_option)
|
||||
context.getAccess()->checkGrantOption(query_requires_access);
|
||||
else
|
||||
context.checkAccess(query_requires_access);
|
||||
|
||||
DDLLogEntry entry;
|
||||
entry.hosts = std::move(hosts);
|
||||
entry.query = queryToString(query_ptr);
|
||||
entry.initiator = ddl_worker.getCommonHostID();
|
||||
String node_path = ddl_worker.enqueueQuery(entry);
|
||||
|
||||
BlockIO io;
|
||||
if (context.getSettingsRef().distributed_ddl_task_timeout == 0)
|
||||
return io;
|
||||
|
||||
auto stream = std::make_shared<DDLQueryStatusInputStream>(node_path, entry, context);
|
||||
io.in = std::move(stream);
|
||||
return io;
|
||||
}
|
||||
|
||||
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & context, const AccessRightsElements & query_requires_access, bool query_requires_grant_option)
|
||||
{
|
||||
return executeDDLQueryOnCluster(query_ptr, context, AccessRightsElements{query_requires_access}, query_requires_grant_option);
|
||||
}
|
||||
|
||||
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & context)
|
||||
{
|
||||
return executeDDLQueryOnCluster(query_ptr_, context, {});
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,11 +1,9 @@
|
||||
#pragma once
|
||||
|
||||
#include <Interpreters/Cluster.h>
|
||||
#include <DataStreams/BlockIO.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Storages/IStorage_fwd.h>
|
||||
#include <Parsers/IAST_fwd.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
@ -18,23 +16,22 @@ namespace zkutil
|
||||
class ZooKeeper;
|
||||
}
|
||||
|
||||
namespace Poco
|
||||
{
|
||||
class Logger;
|
||||
namespace Util { class AbstractConfiguration; }
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class Context;
|
||||
class ASTAlterQuery;
|
||||
class AccessRightsElements;
|
||||
struct DDLLogEntry;
|
||||
struct DDLTask;
|
||||
using DDLTaskPtr = std::unique_ptr<DDLTask>;
|
||||
|
||||
|
||||
/// Pushes distributed DDL query to the queue
|
||||
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & context);
|
||||
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & context, const AccessRightsElements & query_requires_access, bool query_requires_grant_option = false);
|
||||
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & context, AccessRightsElements && query_requires_access, bool query_requires_grant_option = false);
|
||||
|
||||
|
||||
class DDLWorker
|
||||
{
|
||||
public:
|
||||
@ -137,9 +134,6 @@ private:
|
||||
size_t max_tasks_in_queue = 1000;
|
||||
|
||||
ThreadGroupStatusPtr thread_group;
|
||||
|
||||
friend class DDLQueryStatusInputStream;
|
||||
friend struct DDLTask;
|
||||
};
|
||||
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
#include <Interpreters/InterpreterAlterQuery.h>
|
||||
#include <Interpreters/DDLWorker.h>
|
||||
#include <Interpreters/executeDDLQueryOnCluster.h>
|
||||
#include <Interpreters/MutationsInterpreter.h>
|
||||
#include <Interpreters/AddDefaultDatabaseVisitor.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
@ -28,7 +28,8 @@
|
||||
#include <Storages/StorageInMemoryMetadata.h>
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/DDLWorker.h>
|
||||
#include <Interpreters/executeDDLQueryOnCluster.h>
|
||||
#include <Interpreters/Cluster.h>
|
||||
#include <Interpreters/ExpressionAnalyzer.h>
|
||||
#include <Interpreters/InterpreterCreateQuery.h>
|
||||
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
|
||||
|
@ -2,7 +2,7 @@
|
||||
#include <Parsers/ASTCreateQuotaQuery.h>
|
||||
#include <Parsers/ASTRolesOrUsersSet.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/DDLWorker.h>
|
||||
#include <Interpreters/executeDDLQueryOnCluster.h>
|
||||
#include <Access/AccessControlManager.h>
|
||||
#include <Access/AccessFlags.h>
|
||||
#include <ext/range.h>
|
||||
|
@ -1,7 +1,7 @@
|
||||
#include <Interpreters/InterpreterCreateRoleQuery.h>
|
||||
#include <Parsers/ASTCreateRoleQuery.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/DDLWorker.h>
|
||||
#include <Interpreters/executeDDLQueryOnCluster.h>
|
||||
#include <Access/AccessControlManager.h>
|
||||
#include <Access/Role.h>
|
||||
|
||||
|
@ -4,7 +4,7 @@
|
||||
#include <Parsers/ASTRolesOrUsersSet.h>
|
||||
#include <Parsers/formatAST.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/DDLWorker.h>
|
||||
#include <Interpreters/executeDDLQueryOnCluster.h>
|
||||
#include <Access/AccessControlManager.h>
|
||||
#include <Access/AccessFlags.h>
|
||||
#include <boost/range/algorithm/sort.hpp>
|
||||
|
@ -2,7 +2,7 @@
|
||||
#include <Parsers/ASTCreateSettingsProfileQuery.h>
|
||||
#include <Parsers/ASTRolesOrUsersSet.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/DDLWorker.h>
|
||||
#include <Interpreters/executeDDLQueryOnCluster.h>
|
||||
#include <Access/AccessControlManager.h>
|
||||
#include <Access/SettingsProfile.h>
|
||||
#include <Access/AccessFlags.h>
|
||||
|
@ -1,7 +1,7 @@
|
||||
#include <Interpreters/InterpreterCreateUserQuery.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/InterpreterSetRoleQuery.h>
|
||||
#include <Interpreters/DDLWorker.h>
|
||||
#include <Interpreters/executeDDLQueryOnCluster.h>
|
||||
#include <Parsers/ASTCreateUserQuery.h>
|
||||
#include <Parsers/ASTUserNameWithHost.h>
|
||||
#include <Parsers/ASTRolesOrUsersSet.h>
|
||||
|
@ -2,7 +2,7 @@
|
||||
#include <Parsers/ASTDropAccessEntityQuery.h>
|
||||
#include <Parsers/ASTRowPolicyName.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/DDLWorker.h>
|
||||
#include <Interpreters/executeDDLQueryOnCluster.h>
|
||||
#include <Access/AccessControlManager.h>
|
||||
#include <Access/AccessFlags.h>
|
||||
#include <Access/User.h>
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
#include <Databases/IDatabase.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/DDLWorker.h>
|
||||
#include <Interpreters/executeDDLQueryOnCluster.h>
|
||||
#include <Interpreters/InterpreterDropQuery.h>
|
||||
#include <Interpreters/ExternalDictionariesLoader.h>
|
||||
#include <Access/AccessRightsElement.h>
|
||||
|
@ -2,7 +2,7 @@
|
||||
#include <Parsers/ASTGrantQuery.h>
|
||||
#include <Parsers/ASTRolesOrUsersSet.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/DDLWorker.h>
|
||||
#include <Interpreters/executeDDLQueryOnCluster.h>
|
||||
#include <Access/AccessControlManager.h>
|
||||
#include <Access/ContextAccess.h>
|
||||
#include <Access/RolesOrUsersSet.h>
|
||||
|
@ -2,7 +2,7 @@
|
||||
#include <Parsers/ASTKillQueryQuery.h>
|
||||
#include <Parsers/queryToString.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/DDLWorker.h>
|
||||
#include <Interpreters/executeDDLQueryOnCluster.h>
|
||||
#include <Interpreters/ProcessList.h>
|
||||
#include <Interpreters/executeQuery.h>
|
||||
#include <Interpreters/CancellationCode.h>
|
||||
|
@ -1,7 +1,7 @@
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Parsers/ASTOptimizeQuery.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/DDLWorker.h>
|
||||
#include <Interpreters/executeDDLQueryOnCluster.h>
|
||||
#include <Interpreters/InterpreterOptimizeQuery.h>
|
||||
#include <Access/AccessRightsElement.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
|
@ -3,7 +3,7 @@
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/InterpreterRenameQuery.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Interpreters/DDLWorker.h>
|
||||
#include <Interpreters/executeDDLQueryOnCluster.h>
|
||||
#include <Access/AccessRightsElement.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Databases/DatabaseReplicated.h>
|
||||
|
@ -14,7 +14,7 @@
|
||||
#include <Interpreters/InterpreterCreateQuery.h>
|
||||
#include <Interpreters/InterpreterRenameQuery.h>
|
||||
#include <Interpreters/QueryLog.h>
|
||||
#include <Interpreters/DDLWorker.h>
|
||||
#include <Interpreters/executeDDLQueryOnCluster.h>
|
||||
#include <Interpreters/PartLog.h>
|
||||
#include <Interpreters/QueryThreadLog.h>
|
||||
#include <Interpreters/TraceLog.h>
|
||||
|
317
src/Interpreters/executeDDLQueryOnCluster.cpp
Normal file
317
src/Interpreters/executeDDLQueryOnCluster.cpp
Normal file
@ -0,0 +1,317 @@
|
||||
#include <Interpreters/executeDDLQueryOnCluster.h>
|
||||
#include <Interpreters/DDLWorker.h>
|
||||
#include <Interpreters/DDLTask.h>
|
||||
#include <Interpreters/AddDefaultDatabaseVisitor.h>
|
||||
#include <Parsers/ASTQueryWithOutput.h>
|
||||
#include <Parsers/ASTQueryWithOnCluster.h>
|
||||
#include <Parsers/ASTAlterQuery.h>
|
||||
#include <Parsers/queryToString.h>
|
||||
#include <Access/AccessRightsElement.h>
|
||||
#include <Access/ContextAccess.h>
|
||||
#include <Common/Macros.h>
|
||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
extern const int TIMEOUT_EXCEEDED;
|
||||
extern const int UNFINISHED;
|
||||
extern const int QUERY_IS_PROHIBITED;
|
||||
}
|
||||
|
||||
static bool isSupportedAlterType(int type)
|
||||
{
|
||||
static const std::unordered_set<int> unsupported_alter_types{
|
||||
ASTAlterCommand::ATTACH_PARTITION,
|
||||
ASTAlterCommand::REPLACE_PARTITION,
|
||||
ASTAlterCommand::FETCH_PARTITION,
|
||||
ASTAlterCommand::FREEZE_PARTITION,
|
||||
ASTAlterCommand::FREEZE_ALL,
|
||||
ASTAlterCommand::NO_TYPE,
|
||||
};
|
||||
|
||||
return unsupported_alter_types.count(type) == 0;
|
||||
}
|
||||
|
||||
|
||||
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & context)
|
||||
{
|
||||
return executeDDLQueryOnCluster(query_ptr_, context, {});
|
||||
}
|
||||
|
||||
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & context, const AccessRightsElements & query_requires_access, bool query_requires_grant_option)
|
||||
{
|
||||
return executeDDLQueryOnCluster(query_ptr, context, AccessRightsElements{query_requires_access}, query_requires_grant_option);
|
||||
}
|
||||
|
||||
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & context, AccessRightsElements && query_requires_access, bool query_requires_grant_option)
|
||||
{
|
||||
/// Remove FORMAT <fmt> and INTO OUTFILE <file> if exists
|
||||
ASTPtr query_ptr = query_ptr_->clone();
|
||||
ASTQueryWithOutput::resetOutputASTIfExist(*query_ptr);
|
||||
|
||||
// XXX: serious design flaw since `ASTQueryWithOnCluster` is not inherited from `IAST`!
|
||||
auto * query = dynamic_cast<ASTQueryWithOnCluster *>(query_ptr.get());
|
||||
if (!query)
|
||||
{
|
||||
throw Exception("Distributed execution is not supported for such DDL queries", ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
if (!context.getSettingsRef().allow_distributed_ddl)
|
||||
throw Exception("Distributed DDL queries are prohibited for the user", ErrorCodes::QUERY_IS_PROHIBITED);
|
||||
|
||||
if (const auto * query_alter = query_ptr->as<ASTAlterQuery>())
|
||||
{
|
||||
for (const auto & command : query_alter->command_list->commands)
|
||||
{
|
||||
if (!isSupportedAlterType(command->type))
|
||||
throw Exception("Unsupported type of ALTER query", ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
}
|
||||
|
||||
query->cluster = context.getMacros()->expand(query->cluster);
|
||||
ClusterPtr cluster = context.getCluster(query->cluster);
|
||||
DDLWorker & ddl_worker = context.getDDLWorker();
|
||||
|
||||
/// Enumerate hosts which will be used to send query.
|
||||
Cluster::AddressesWithFailover shards = cluster->getShardsAddresses();
|
||||
std::vector<HostID> hosts;
|
||||
for (const auto & shard : shards)
|
||||
{
|
||||
for (const auto & addr : shard)
|
||||
hosts.emplace_back(addr);
|
||||
}
|
||||
|
||||
if (hosts.empty())
|
||||
throw Exception("No hosts defined to execute distributed DDL query", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
/// The current database in a distributed query need to be replaced with either
|
||||
/// the local current database or a shard's default database.
|
||||
bool need_replace_current_database
|
||||
= (std::find_if(
|
||||
query_requires_access.begin(),
|
||||
query_requires_access.end(),
|
||||
[](const AccessRightsElement & elem) { return elem.isEmptyDatabase(); })
|
||||
!= query_requires_access.end());
|
||||
|
||||
bool use_local_default_database = false;
|
||||
const String & current_database = context.getCurrentDatabase();
|
||||
|
||||
if (need_replace_current_database)
|
||||
{
|
||||
Strings shard_default_databases;
|
||||
for (const auto & shard : shards)
|
||||
{
|
||||
for (const auto & addr : shard)
|
||||
{
|
||||
if (!addr.default_database.empty())
|
||||
shard_default_databases.push_back(addr.default_database);
|
||||
else
|
||||
use_local_default_database = true;
|
||||
}
|
||||
}
|
||||
std::sort(shard_default_databases.begin(), shard_default_databases.end());
|
||||
shard_default_databases.erase(std::unique(shard_default_databases.begin(), shard_default_databases.end()), shard_default_databases.end());
|
||||
assert(use_local_default_database || !shard_default_databases.empty());
|
||||
|
||||
if (use_local_default_database && !shard_default_databases.empty())
|
||||
throw Exception("Mixed local default DB and shard default DB in DDL query", ErrorCodes::NOT_IMPLEMENTED);
|
||||
|
||||
if (use_local_default_database)
|
||||
{
|
||||
query_requires_access.replaceEmptyDatabase(current_database);
|
||||
}
|
||||
else
|
||||
{
|
||||
for (size_t i = 0; i != query_requires_access.size();)
|
||||
{
|
||||
auto & element = query_requires_access[i];
|
||||
if (element.isEmptyDatabase())
|
||||
{
|
||||
query_requires_access.insert(query_requires_access.begin() + i + 1, shard_default_databases.size() - 1, element);
|
||||
for (size_t j = 0; j != shard_default_databases.size(); ++j)
|
||||
query_requires_access[i + j].replaceEmptyDatabase(shard_default_databases[j]);
|
||||
i += shard_default_databases.size();
|
||||
}
|
||||
else
|
||||
++i;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
AddDefaultDatabaseVisitor visitor(current_database, !use_local_default_database);
|
||||
visitor.visitDDL(query_ptr);
|
||||
|
||||
/// Check access rights, assume that all servers have the same users config
|
||||
if (query_requires_grant_option)
|
||||
context.getAccess()->checkGrantOption(query_requires_access);
|
||||
else
|
||||
context.checkAccess(query_requires_access);
|
||||
|
||||
DDLLogEntry entry;
|
||||
entry.hosts = std::move(hosts);
|
||||
entry.query = queryToString(query_ptr);
|
||||
entry.initiator = ddl_worker.getCommonHostID();
|
||||
String node_path = ddl_worker.enqueueQuery(entry);
|
||||
|
||||
BlockIO io;
|
||||
if (context.getSettingsRef().distributed_ddl_task_timeout == 0)
|
||||
return io;
|
||||
|
||||
auto stream = std::make_shared<DDLQueryStatusInputStream>(node_path, entry, context);
|
||||
io.in = std::move(stream);
|
||||
return io;
|
||||
}
|
||||
|
||||
|
||||
DDLQueryStatusInputStream::DDLQueryStatusInputStream(const String & zk_node_path, const DDLLogEntry & entry, const Context & context_)
|
||||
: node_path(zk_node_path)
|
||||
, context(context_)
|
||||
, watch(CLOCK_MONOTONIC_COARSE)
|
||||
, log(&Poco::Logger::get("DDLQueryStatusInputStream"))
|
||||
{
|
||||
sample = Block{
|
||||
{std::make_shared<DataTypeString>(), "host"},
|
||||
{std::make_shared<DataTypeUInt16>(), "port"},
|
||||
{std::make_shared<DataTypeInt64>(), "status"},
|
||||
{std::make_shared<DataTypeString>(), "error"},
|
||||
{std::make_shared<DataTypeUInt64>(), "num_hosts_remaining"},
|
||||
{std::make_shared<DataTypeUInt64>(), "num_hosts_active"},
|
||||
};
|
||||
|
||||
for (const HostID & host: entry.hosts)
|
||||
waiting_hosts.emplace(host.toString());
|
||||
|
||||
addTotalRowsApprox(entry.hosts.size());
|
||||
|
||||
timeout_seconds = context.getSettingsRef().distributed_ddl_task_timeout;
|
||||
}
|
||||
|
||||
Block DDLQueryStatusInputStream::readImpl()
|
||||
{
|
||||
Block res;
|
||||
if (num_hosts_finished >= waiting_hosts.size())
|
||||
{
|
||||
if (first_exception)
|
||||
throw Exception(*first_exception);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
auto zookeeper = context.getZooKeeper();
|
||||
size_t try_number = 0;
|
||||
|
||||
while (res.rows() == 0)
|
||||
{
|
||||
if (isCancelled())
|
||||
{
|
||||
if (first_exception)
|
||||
throw Exception(*first_exception);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
if (timeout_seconds >= 0 && watch.elapsedSeconds() > timeout_seconds)
|
||||
{
|
||||
size_t num_unfinished_hosts = waiting_hosts.size() - num_hosts_finished;
|
||||
size_t num_active_hosts = current_active_hosts.size();
|
||||
|
||||
|
||||
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED,
|
||||
"Watching task {} is executing longer than distributed_ddl_task_timeout (={}) seconds. "
|
||||
"There are {} unfinished hosts ({} of them are currently active), they are going to execute the query in background",
|
||||
node_path, timeout_seconds, num_unfinished_hosts, num_active_hosts);
|
||||
}
|
||||
|
||||
if (num_hosts_finished != 0 || try_number != 0)
|
||||
{
|
||||
sleepForMilliseconds(std::min<size_t>(1000, 50 * (try_number + 1)));
|
||||
}
|
||||
|
||||
/// TODO: add shared lock
|
||||
if (!zookeeper->exists(node_path))
|
||||
{
|
||||
throw Exception(ErrorCodes::UNFINISHED,
|
||||
"Cannot provide query execution status. The query's node {} has been deleted by the cleaner since it was finished (or its lifetime is expired)",
|
||||
node_path);
|
||||
}
|
||||
|
||||
Strings new_hosts = getNewAndUpdate(getChildrenAllowNoNode(zookeeper, node_path + "/finished"));
|
||||
++try_number;
|
||||
if (new_hosts.empty())
|
||||
continue;
|
||||
|
||||
current_active_hosts = getChildrenAllowNoNode(zookeeper, node_path + "/active");
|
||||
|
||||
MutableColumns columns = sample.cloneEmptyColumns();
|
||||
for (const String & host_id : new_hosts)
|
||||
{
|
||||
ExecutionStatus status(-1, "Cannot obtain error message");
|
||||
{
|
||||
String status_data;
|
||||
if (zookeeper->tryGet(node_path + "/finished/" + host_id, status_data))
|
||||
status.tryDeserializeText(status_data);
|
||||
}
|
||||
|
||||
auto [host, port] = Cluster::Address::fromString(host_id);
|
||||
|
||||
if (status.code != 0 && first_exception == nullptr)
|
||||
first_exception = std::make_unique<Exception>(status.code, "There was an error on [{}:{}]: {}", host, port, status.message);
|
||||
|
||||
++num_hosts_finished;
|
||||
|
||||
columns[0]->insert(host);
|
||||
columns[1]->insert(port);
|
||||
columns[2]->insert(status.code);
|
||||
columns[3]->insert(status.message);
|
||||
columns[4]->insert(waiting_hosts.size() - num_hosts_finished);
|
||||
columns[5]->insert(current_active_hosts.size());
|
||||
}
|
||||
res = sample.cloneWithColumns(std::move(columns));
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
Strings DDLQueryStatusInputStream::getChildrenAllowNoNode(const std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & node_path)
|
||||
{
|
||||
Strings res;
|
||||
Coordination::Error code = zookeeper->tryGetChildren(node_path, res);
|
||||
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
|
||||
throw Coordination::Exception(code, node_path);
|
||||
return res;
|
||||
}
|
||||
|
||||
Strings DDLQueryStatusInputStream::getNewAndUpdate(const Strings & current_list_of_finished_hosts)
|
||||
{
|
||||
Strings diff;
|
||||
for (const String & host : current_list_of_finished_hosts)
|
||||
{
|
||||
if (!waiting_hosts.count(host))
|
||||
{
|
||||
if (!ignoring_hosts.count(host))
|
||||
{
|
||||
ignoring_hosts.emplace(host);
|
||||
LOG_INFO(log, "Unexpected host {} appeared in task {}", host, node_path);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!finished_hosts.count(host))
|
||||
{
|
||||
diff.emplace_back(host);
|
||||
finished_hosts.emplace(host);
|
||||
}
|
||||
}
|
||||
|
||||
return diff;
|
||||
}
|
||||
|
||||
|
||||
}
|
63
src/Interpreters/executeDDLQueryOnCluster.h
Normal file
63
src/Interpreters/executeDDLQueryOnCluster.h
Normal file
@ -0,0 +1,63 @@
|
||||
#pragma once
|
||||
#include <DataStreams/BlockIO.h>
|
||||
#include <Parsers/IAST_fwd.h>
|
||||
|
||||
namespace zkutil
|
||||
{
|
||||
class ZooKeeper;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class Context;
|
||||
class AccessRightsElements;
|
||||
struct DDLLogEntry;
|
||||
|
||||
|
||||
/// Pushes distributed DDL query to the queue.
|
||||
/// Returns DDLQueryStatusInputStream, which reads results of query execution on each host in the cluster.
|
||||
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & context);
|
||||
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & context, const AccessRightsElements & query_requires_access, bool query_requires_grant_option = false);
|
||||
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & context, AccessRightsElements && query_requires_access, bool query_requires_grant_option = false);
|
||||
|
||||
|
||||
class DDLQueryStatusInputStream : public IBlockInputStream
|
||||
{
|
||||
public:
|
||||
DDLQueryStatusInputStream(const String & zk_node_path, const DDLLogEntry & entry, const Context & context_);
|
||||
|
||||
String getName() const override { return "DDLQueryStatusInputStream"; }
|
||||
|
||||
Block getHeader() const override { return sample; }
|
||||
|
||||
Block getSampleBlock() const { return sample.cloneEmpty(); }
|
||||
|
||||
Block readImpl() override;
|
||||
|
||||
private:
|
||||
|
||||
static Strings getChildrenAllowNoNode(const std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & node_path);
|
||||
|
||||
Strings getNewAndUpdate(const Strings & current_list_of_finished_hosts);
|
||||
|
||||
String node_path;
|
||||
const Context & context;
|
||||
Stopwatch watch;
|
||||
Poco::Logger * log;
|
||||
|
||||
Block sample;
|
||||
|
||||
NameSet waiting_hosts; /// hosts from task host list
|
||||
NameSet finished_hosts; /// finished hosts from host list
|
||||
NameSet ignoring_hosts; /// appeared hosts that are not in hosts list
|
||||
Strings current_active_hosts; /// Hosts that were in active state at the last check
|
||||
size_t num_hosts_finished = 0;
|
||||
|
||||
/// Save the first detected error and throw it at the end of execution
|
||||
std::unique_ptr<Exception> first_exception;
|
||||
|
||||
Int64 timeout_seconds = 120;
|
||||
};
|
||||
|
||||
}
|
@ -45,11 +45,13 @@ SRCS(
|
||||
CrossToInnerJoinVisitor.cpp
|
||||
DatabaseAndTableWithAlias.cpp
|
||||
DatabaseCatalog.cpp
|
||||
DDLTask.cpp
|
||||
DDLWorker.cpp
|
||||
DictionaryReader.cpp
|
||||
DNSCacheUpdater.cpp
|
||||
EmbeddedDictionaries.cpp
|
||||
evaluateConstantExpression.cpp
|
||||
executeDDLQueryOnCluster.cpp
|
||||
executeQuery.cpp
|
||||
ExecuteScalarSubqueriesVisitor.cpp
|
||||
ExpressionActions.cpp
|
||||
|
Loading…
Reference in New Issue
Block a user