ClickHouse/src/Interpreters/executeDDLQueryOnCluster.cpp

406 lines
15 KiB
C++
Raw Normal View History

2020-11-03 13:47:26 +00:00
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/DDLWorker.h>
#include <Interpreters/DDLTask.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Interpreters/Context.h>
2020-11-03 13:47:26 +00:00
#include <Parsers/ASTQueryWithOutput.h>
#include <Parsers/ASTQueryWithOnCluster.h>
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/queryToString.h>
#include <Access/AccessRightsElement.h>
#include <Access/ContextAccess.h>
#include <Common/Macros.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeNullable.h>
2021-07-17 21:45:07 +00:00
#include <Processors/NullSink.h>
#include <filesystem>
2020-11-03 13:47:26 +00:00
2021-07-17 21:45:07 +00:00
namespace fs = std::filesystem;
2020-11-03 13:47:26 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int TIMEOUT_EXCEEDED;
extern const int UNFINISHED;
extern const int QUERY_IS_PROHIBITED;
2020-11-29 11:45:32 +00:00
extern const int LOGICAL_ERROR;
2020-11-03 13:47:26 +00:00
}
bool isSupportedAlterType(int type)
2020-11-03 13:47:26 +00:00
{
2021-01-26 17:51:25 +00:00
assert(type != ASTAlterCommand::NO_TYPE);
2020-11-03 13:47:26 +00:00
static const std::unordered_set<int> unsupported_alter_types{
2021-02-03 20:02:37 +00:00
/// It's dangerous, because it may duplicate data if executed on multiple replicas. We can allow it after #18978
2020-11-03 13:47:26 +00:00
ASTAlterCommand::ATTACH_PARTITION,
2021-01-26 17:51:25 +00:00
/// Usually followed by ATTACH PARTITION
2020-11-03 13:47:26 +00:00
ASTAlterCommand::FETCH_PARTITION,
2021-01-26 17:51:25 +00:00
/// Logical error
2020-11-03 13:47:26 +00:00
ASTAlterCommand::NO_TYPE,
};
return unsupported_alter_types.count(type) == 0;
}
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, ContextPtr context)
2020-11-03 13:47:26 +00:00
{
return executeDDLQueryOnCluster(query_ptr_, context, {});
}
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, ContextPtr context, const AccessRightsElements & query_requires_access)
2020-11-03 13:47:26 +00:00
{
return executeDDLQueryOnCluster(query_ptr, context, AccessRightsElements{query_requires_access});
2020-11-03 13:47:26 +00:00
}
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, ContextPtr context, AccessRightsElements && query_requires_access)
2020-11-03 13:47:26 +00:00
{
/// Remove FORMAT <fmt> and INTO OUTFILE <file> if exists
ASTPtr query_ptr = query_ptr_->clone();
ASTQueryWithOutput::resetOutputASTIfExist(*query_ptr);
// XXX: serious design flaw since `ASTQueryWithOnCluster` is not inherited from `IAST`!
auto * query = dynamic_cast<ASTQueryWithOnCluster *>(query_ptr.get());
if (!query)
{
throw Exception("Distributed execution is not supported for such DDL queries", ErrorCodes::NOT_IMPLEMENTED);
}
if (!context->getSettingsRef().allow_distributed_ddl)
2020-11-03 13:47:26 +00:00
throw Exception("Distributed DDL queries are prohibited for the user", ErrorCodes::QUERY_IS_PROHIBITED);
if (const auto * query_alter = query_ptr->as<ASTAlterQuery>())
{
for (const auto & command : query_alter->command_list->children)
2020-11-03 13:47:26 +00:00
{
if (!isSupportedAlterType(command->as<ASTAlterCommand&>().type))
2020-11-03 13:47:26 +00:00
throw Exception("Unsupported type of ALTER query", ErrorCodes::NOT_IMPLEMENTED);
}
}
query->cluster = context->getMacros()->expand(query->cluster);
ClusterPtr cluster = context->getCluster(query->cluster);
DDLWorker & ddl_worker = context->getDDLWorker();
2020-11-03 13:47:26 +00:00
/// Enumerate hosts which will be used to send query.
Cluster::AddressesWithFailover shards = cluster->getShardsAddresses();
std::vector<HostID> hosts;
for (const auto & shard : shards)
{
for (const auto & addr : shard)
hosts.emplace_back(addr);
}
if (hosts.empty())
throw Exception("No hosts defined to execute distributed DDL query", ErrorCodes::LOGICAL_ERROR);
/// The current database in a distributed query need to be replaced with either
/// the local current database or a shard's default database.
SYSTEM RESTORE REPLICA replica [ON CLUSTER cluster] (#13652) * initial commit: add setting and stub * typo * added test stub * fix * wip merging new integration test and code proto * adding steps interpreters * adding firstly proposed solution (moving parts etc) * added checking zookeeper path existence * fixing the include * fixing and sorting includes * fixing outdated struct * fix the name * added ast ptr as level of indirection * fix ref * updating the changes * working on test stub * fix iterator -> reference * revert rocksdb submodule update * fixed show privileges test * updated the test stub * replaced rand() with thread_local_rng(), updated the tests updated the test fixed test config path test fix removed error messages fixed the test updated the test fixed string literal fixed literal typo: = * fixed the empty replica error message * updated the test and the code with logs * updated the possible test cases, updated * added the code/test milestone comments * updated the test (added more testcases) * replaced native assert with CH one * individual replicas recursive delete fix * updated the AS db.name AST * two small logging fixes * manually generated AST fixes * Updated the test, added the possible algo change * Some thoughts about optimizing the solution: ALTER MOVE PARTITION .. TO TABLE -> move to detached/ + ALTER ... ATTACH * fix * Removed the replica sync in test as it's invalid * Some test tweaks * tmp * Rewrote the algo by using the executeQuery instead of hand-crafting the ASTPtr. Two questions still active. * tr: logging active parts * Extracted the parts moving algo into a separate helper function * Fixed the test data and the queries slightly * Replaced query to system.parts to direct invocation, started building the test that breaks on various parts. * Added the case for tables when at least one replica is alive * Updated the test to test replicas restoration by detaching/attaching * Altered the test to check restoration without replica restart * Added the tables swap in the start if the server failed last time * Hotfix when only /replicas/replica... path was deleted * Restore ZK paths while creating a replicated MergeTree table * Updated the docs, fixed the algo for individual replicas restoration case * Initial parts table storage fix, tests sync fix * Reverted individual replica restoration to general algo * Slightly optimised getDataParts * Trying another solution with parts detaching * Rewrote algo without any steps, added ON CLUSTER support * Attaching parts from other replica on restoration * Getting part checksums from ZK * Removed ON CLUSTER, finished working solution * Multiple small changes after review * Fixing parallel test * Supporting rewritten form on cluster * Test fix * Moar logging * Using source replica as checksum provider * improve test, remove some code from parser * Trying solution with move to detached + forget * Moving all parts (not only Committed) to detached * Edited docs for RESTORE REPLICA * Re-merging * minor fixes Co-authored-by: Alexander Tokmakov <avtokmakov@yandex-team.ru>
2021-06-20 08:24:43 +00:00
bool need_replace_current_database = std::any_of(
query_requires_access.begin(),
query_requires_access.end(),
[](const AccessRightsElement & elem) { return elem.isEmptyDatabase(); });
2020-11-03 13:47:26 +00:00
bool use_local_default_database = false;
const String & current_database = context->getCurrentDatabase();
2020-11-03 13:47:26 +00:00
if (need_replace_current_database)
{
Strings shard_default_databases;
for (const auto & shard : shards)
{
for (const auto & addr : shard)
{
if (!addr.default_database.empty())
shard_default_databases.push_back(addr.default_database);
else
use_local_default_database = true;
}
}
std::sort(shard_default_databases.begin(), shard_default_databases.end());
shard_default_databases.erase(std::unique(shard_default_databases.begin(), shard_default_databases.end()), shard_default_databases.end());
assert(use_local_default_database || !shard_default_databases.empty());
if (use_local_default_database && !shard_default_databases.empty())
throw Exception("Mixed local default DB and shard default DB in DDL query", ErrorCodes::NOT_IMPLEMENTED);
if (use_local_default_database)
{
query_requires_access.replaceEmptyDatabase(current_database);
}
else
{
for (size_t i = 0; i != query_requires_access.size();)
{
auto & element = query_requires_access[i];
if (element.isEmptyDatabase())
{
query_requires_access.insert(query_requires_access.begin() + i + 1, shard_default_databases.size() - 1, element);
for (size_t j = 0; j != shard_default_databases.size(); ++j)
query_requires_access[i + j].replaceEmptyDatabase(shard_default_databases[j]);
i += shard_default_databases.size();
}
else
++i;
}
}
}
AddDefaultDatabaseVisitor visitor(current_database, !use_local_default_database);
visitor.visitDDL(query_ptr);
/// Check access rights, assume that all servers have the same users config
context->checkAccess(query_requires_access);
2020-11-03 13:47:26 +00:00
DDLLogEntry entry;
entry.hosts = std::move(hosts);
entry.query = queryToString(query_ptr);
entry.initiator = ddl_worker.getCommonHostID();
entry.setSettingsIfRequired(context);
2020-11-03 13:47:26 +00:00
String node_path = ddl_worker.enqueueQuery(entry);
return getDistributedDDLStatus(node_path, entry, context);
}
BlockIO getDistributedDDLStatus(const String & node_path, const DDLLogEntry & entry, ContextPtr context, const std::optional<Strings> & hosts_to_wait)
{
2020-11-03 13:47:26 +00:00
BlockIO io;
if (context->getSettingsRef().distributed_ddl_task_timeout == 0)
2020-11-03 13:47:26 +00:00
return io;
2021-07-17 21:45:07 +00:00
ProcessorPtr processor = std::make_shared<DDLQueryStatusSource>(node_path, entry, context, hosts_to_wait);
io.pipeline.init(Pipe{processor});
if (context->getSettingsRef().distributed_ddl_output_mode == DistributedDDLOutputMode::NONE)
2021-07-17 21:45:07 +00:00
io.pipeline.setSinks([](const Block & header, QueryPipeline::StreamType){ return std::make_shared<EmptySink>(header); });
2021-06-15 20:52:29 +00:00
2020-11-03 13:47:26 +00:00
return io;
}
2021-07-19 16:46:58 +00:00
static Block getSampleBlock(ContextPtr context_, bool hosts_to_wait)
{
auto output_mode = context_->getSettingsRef().distributed_ddl_output_mode;
auto maybe_make_nullable = [&](const DataTypePtr & type) -> DataTypePtr
{
if (output_mode == DistributedDDLOutputMode::THROW || output_mode == DistributedDDLOutputMode::NONE)
return type;
return std::make_shared<DataTypeNullable>(type);
};
Block res = Block{
{std::make_shared<DataTypeString>(), "host"},
{std::make_shared<DataTypeUInt16>(), "port"},
{maybe_make_nullable(std::make_shared<DataTypeInt64>()), "status"},
{maybe_make_nullable(std::make_shared<DataTypeString>()), "error"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_remaining"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_active"},
};
if (hosts_to_wait)
res.erase("port");
return res;
}
2021-07-17 21:45:07 +00:00
DDLQueryStatusSource::DDLQueryStatusSource(
const String & zk_node_path, const DDLLogEntry & entry, ContextPtr context_, const std::optional<Strings> & hosts_to_wait)
: SourceWithProgress(getSampleBlock(context_, hosts_to_wait.has_value()), true)
, node_path(zk_node_path)
2020-11-03 13:47:26 +00:00
, context(context_)
, watch(CLOCK_MONOTONIC_COARSE)
, log(&Poco::Logger::get("DDLQueryStatusInputStream"))
{
2021-07-17 21:45:07 +00:00
auto output_mode = context->getSettingsRef().distributed_ddl_output_mode;
throw_on_timeout = output_mode == DistributedDDLOutputMode::THROW || output_mode == DistributedDDLOutputMode::NONE;
if (hosts_to_wait)
{
waiting_hosts = NameSet(hosts_to_wait->begin(), hosts_to_wait->end());
by_hostname = false;
}
else
2021-07-17 21:45:07 +00:00
{
for (const HostID & host : entry.hosts)
waiting_hosts.emplace(host.toString());
}
addTotalRowsApprox(waiting_hosts.size());
timeout_seconds = context->getSettingsRef().distributed_ddl_task_timeout;
}
std::pair<String, UInt16> DDLQueryStatusSource::parseHostAndPort(const String & host_id) const
{
String host = host_id;
UInt16 port = 0;
if (by_hostname)
{
auto host_and_port = Cluster::Address::fromString(host_id);
host = host_and_port.first;
port = host_and_port.second;
}
return {host, port};
2020-11-03 13:47:26 +00:00
}
2021-07-17 21:45:07 +00:00
Chunk DDLQueryStatusSource::generate()
2020-11-03 13:47:26 +00:00
{
bool all_hosts_finished = num_hosts_finished >= waiting_hosts.size();
2021-07-17 21:45:07 +00:00
/// Seems like num_hosts_finished cannot be strictly greater than waiting_hosts.size()
assert(num_hosts_finished <= waiting_hosts.size());
2020-11-03 13:47:26 +00:00
2021-07-17 21:45:07 +00:00
if (all_hosts_finished || timeout_exceeded)
return {};
2020-11-03 13:47:26 +00:00
auto zookeeper = context->getZooKeeper();
2020-11-03 13:47:26 +00:00
size_t try_number = 0;
2021-07-17 21:45:07 +00:00
while (true)
2020-11-03 13:47:26 +00:00
{
if (isCancelled())
2021-07-17 21:45:07 +00:00
return {};
2020-11-03 13:47:26 +00:00
if (timeout_seconds >= 0 && watch.elapsedSeconds() > timeout_seconds)
2020-11-03 13:47:26 +00:00
{
size_t num_unfinished_hosts = waiting_hosts.size() - num_hosts_finished;
size_t num_active_hosts = current_active_hosts.size();
constexpr const char * msg_format = "Watching task {} is executing longer than distributed_ddl_task_timeout (={}) seconds. "
"There are {} unfinished hosts ({} of them are currently active), "
"they are going to execute the query in background";
if (throw_on_timeout)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, msg_format,
node_path, timeout_seconds, num_unfinished_hosts, num_active_hosts);
timeout_exceeded = true;
LOG_INFO(log, msg_format, node_path, timeout_seconds, num_unfinished_hosts, num_active_hosts);
2020-11-03 13:47:26 +00:00
NameSet unfinished_hosts = waiting_hosts;
for (const auto & host_id : finished_hosts)
unfinished_hosts.erase(host_id);
/// Query is not finished on the rest hosts, so fill the corresponding rows with NULLs.
2021-07-17 21:45:07 +00:00
MutableColumns columns = output.getHeader().cloneEmptyColumns();
for (const String & host_id : unfinished_hosts)
{
auto [host, port] = parseHostAndPort(host_id);
2021-03-09 21:41:04 +00:00
size_t num = 0;
columns[num++]->insert(host);
if (by_hostname)
columns[num++]->insert(port);
columns[num++]->insert(Field{});
columns[num++]->insert(Field{});
columns[num++]->insert(num_unfinished_hosts);
columns[num++]->insert(num_active_hosts);
}
2021-07-17 21:45:07 +00:00
return Chunk(std::move(columns), unfinished_hosts.size());
2020-11-03 13:47:26 +00:00
}
if (num_hosts_finished != 0 || try_number != 0)
{
sleepForMilliseconds(std::min<size_t>(1000, 50 * (try_number + 1)));
}
if (!zookeeper->exists(node_path))
{
throw Exception(ErrorCodes::UNFINISHED,
2021-07-17 21:45:07 +00:00
"Cannot provide query execution status. The query's node {} has been deleted by the cleaner"
" since it was finished (or its lifetime is expired)", node_path);
2020-11-03 13:47:26 +00:00
}
Strings new_hosts = getNewAndUpdate(getChildrenAllowNoNode(zookeeper, fs::path(node_path) / "finished"));
2020-11-03 13:47:26 +00:00
++try_number;
if (new_hosts.empty())
continue;
current_active_hosts = getChildrenAllowNoNode(zookeeper, fs::path(node_path) / "active");
2020-11-03 13:47:26 +00:00
2021-07-17 21:45:07 +00:00
MutableColumns columns = output.getHeader().cloneEmptyColumns();
2020-11-03 13:47:26 +00:00
for (const String & host_id : new_hosts)
{
ExecutionStatus status(-1, "Cannot obtain error message");
{
String status_data;
if (zookeeper->tryGet(fs::path(node_path) / "finished" / host_id, status_data))
2020-11-03 13:47:26 +00:00
status.tryDeserializeText(status_data);
}
auto [host, port] = parseHostAndPort(host_id);
2020-11-03 13:47:26 +00:00
2021-07-17 21:45:07 +00:00
if (status.code != 0 && !first_exception)
2020-11-03 13:47:26 +00:00
first_exception = std::make_unique<Exception>(status.code, "There was an error on [{}:{}]: {}", host, port, status.message);
++num_hosts_finished;
2021-03-09 21:41:04 +00:00
size_t num = 0;
columns[num++]->insert(host);
if (by_hostname)
columns[num++]->insert(port);
columns[num++]->insert(status.code);
columns[num++]->insert(status.message);
columns[num++]->insert(waiting_hosts.size() - num_hosts_finished);
columns[num++]->insert(current_active_hosts.size());
2020-11-03 13:47:26 +00:00
}
2021-07-17 21:45:07 +00:00
return Chunk(std::move(columns), new_hosts.size());
2020-11-03 13:47:26 +00:00
}
2021-07-17 21:45:07 +00:00
}
2020-11-03 13:47:26 +00:00
2021-07-19 17:23:16 +00:00
IProcessor::Status DDLQueryStatusSource::prepare()
2021-07-17 21:45:07 +00:00
{
if (finished)
{
bool throw_if_error_on_host = context->getSettingsRef().distributed_ddl_output_mode != DistributedDDLOutputMode::NEVER_THROW;
if (first_exception && throw_if_error_on_host)
2021-07-19 16:46:58 +00:00
output.pushException(std::make_exception_ptr(*first_exception));
2021-07-19 17:23:16 +00:00
output.finish();
return Status::Finished;
2021-07-17 21:45:07 +00:00
}
2021-07-19 17:23:16 +00:00
else
return SourceWithProgress::prepare();
2020-11-03 13:47:26 +00:00
}
2021-07-17 21:45:07 +00:00
Strings DDLQueryStatusSource::getChildrenAllowNoNode(const std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & node_path)
2020-11-03 13:47:26 +00:00
{
Strings res;
Coordination::Error code = zookeeper->tryGetChildren(node_path, res);
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
throw Coordination::Exception(code, node_path);
return res;
}
2021-07-17 21:45:07 +00:00
Strings DDLQueryStatusSource::getNewAndUpdate(const Strings & current_list_of_finished_hosts)
2020-11-03 13:47:26 +00:00
{
Strings diff;
for (const String & host : current_list_of_finished_hosts)
{
if (!waiting_hosts.count(host))
{
if (!ignoring_hosts.count(host))
{
ignoring_hosts.emplace(host);
2021-07-17 21:45:07 +00:00
LOG_INFO(log, "Unexpected host {} appeared in task {}", host, node_path);
2020-11-03 13:47:26 +00:00
}
continue;
}
if (!finished_hosts.count(host))
{
diff.emplace_back(host);
finished_hosts.emplace(host);
}
}
return diff;
}
}