2017-04-13 13:42:29 +00:00
|
|
|
#include <Interpreters/DDLWorker.h>
|
2017-04-13 16:12:56 +00:00
|
|
|
|
|
|
|
#include <Parsers/ASTAlterQuery.h>
|
2017-04-21 12:39:28 +00:00
|
|
|
#include <Parsers/ASTQueryWithOnCluster.h>
|
2017-04-25 15:21:03 +00:00
|
|
|
#include <Parsers/ParserQuery.h>
|
|
|
|
#include <Parsers/parseQuery.h>
|
|
|
|
#include <Parsers/queryToString.h>
|
2017-04-13 16:12:56 +00:00
|
|
|
|
2017-04-17 17:04:31 +00:00
|
|
|
#include <IO/WriteHelpers.h>
|
|
|
|
#include <IO/ReadHelpers.h>
|
|
|
|
#include <IO/Operators.h>
|
|
|
|
#include <IO/ReadBufferFromString.h>
|
|
|
|
|
2017-04-25 15:21:03 +00:00
|
|
|
#include <Storages/IStorage.h>
|
2017-04-27 15:19:11 +00:00
|
|
|
#include <DataStreams/OneBlockInputStream.h>
|
2017-04-25 15:21:03 +00:00
|
|
|
|
2017-04-13 13:42:29 +00:00
|
|
|
#include <Interpreters/executeQuery.h>
|
2017-04-13 16:12:56 +00:00
|
|
|
#include <Interpreters/Cluster.h>
|
2017-04-27 15:19:11 +00:00
|
|
|
|
2017-04-13 16:12:56 +00:00
|
|
|
#include <Common/getFQDNOrHostName.h>
|
2017-04-27 15:19:11 +00:00
|
|
|
#include <Common/setThreadName.h>
|
|
|
|
#include <Common/Stopwatch.h>
|
2017-04-13 16:12:56 +00:00
|
|
|
|
|
|
|
#include <DataTypes/DataTypesNumber.h>
|
|
|
|
#include <DataTypes/DataTypeString.h>
|
|
|
|
#include <DataTypes/DataTypeArray.h>
|
|
|
|
#include <Columns/ColumnsNumber.h>
|
|
|
|
#include <Columns/ColumnString.h>
|
|
|
|
#include <Columns/ColumnArray.h>
|
2017-04-13 13:42:29 +00:00
|
|
|
|
2017-06-19 20:06:35 +00:00
|
|
|
#include <Common/ZooKeeper/ZooKeeper.h>
|
|
|
|
#include <Common/ZooKeeper/Lock.h>
|
2017-04-19 14:21:27 +00:00
|
|
|
#include <Poco/Timestamp.h>
|
|
|
|
|
2017-05-30 11:49:17 +00:00
|
|
|
#include <experimental/optional>
|
|
|
|
|
2017-04-13 13:42:29 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
|
|
|
|
extern const int INVALID_CONFIG_PARAMETER;
|
2017-04-17 17:04:31 +00:00
|
|
|
extern const int UNKNOWN_FORMAT_VERSION;
|
2017-04-25 15:21:03 +00:00
|
|
|
extern const int INCONSISTENT_TABLE_ACCROSS_SHARDS;
|
|
|
|
extern const int INCONSISTENT_CLUSTER_DEFINITION;
|
2017-04-27 15:19:11 +00:00
|
|
|
extern const int TIMEOUT_EXCEEDED;
|
2017-04-21 12:39:28 +00:00
|
|
|
extern const int UNFINISHED;
|
2017-04-13 13:42:29 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2017-05-31 14:01:08 +00:00
|
|
|
const size_t DDLWorker::node_max_lifetime_seconds = 7 * 24 * 60 * 60; // week
|
|
|
|
const size_t DDLWorker::cleanup_min_period_seconds = 60; // minute
|
|
|
|
|
|
|
|
|
2017-04-17 17:04:31 +00:00
|
|
|
struct DDLLogEntry
|
|
|
|
{
|
|
|
|
String query;
|
|
|
|
Strings hosts;
|
2017-05-30 11:49:17 +00:00
|
|
|
String initiator; // optional
|
2017-04-17 17:04:31 +00:00
|
|
|
|
2017-06-01 09:22:22 +00:00
|
|
|
static constexpr int CURRENT_VERSION = 1;
|
2017-04-17 17:04:31 +00:00
|
|
|
|
|
|
|
String toString()
|
|
|
|
{
|
2017-07-31 21:39:24 +00:00
|
|
|
WriteBufferFromOwnString wb;
|
|
|
|
|
|
|
|
auto version = CURRENT_VERSION;
|
|
|
|
wb << "version: " << version << "\n";
|
|
|
|
wb << "query: " << escape << query << "\n";
|
|
|
|
wb << "hosts: " << hosts << "\n";
|
|
|
|
wb << "initiator: " << initiator << "\n";
|
|
|
|
|
|
|
|
return wb.str();
|
2017-04-17 17:04:31 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void parse(const String & data)
|
|
|
|
{
|
|
|
|
ReadBufferFromString rb(data);
|
|
|
|
|
2017-05-30 11:49:17 +00:00
|
|
|
int version;
|
|
|
|
rb >> "version: " >> version >> "\n";
|
|
|
|
|
2017-04-17 17:04:31 +00:00
|
|
|
if (version != CURRENT_VERSION)
|
2017-06-13 04:45:30 +00:00
|
|
|
throw Exception("Unknown DDLLogEntry format version: " + DB::toString(version), ErrorCodes::UNKNOWN_FORMAT_VERSION);
|
2017-04-17 17:04:31 +00:00
|
|
|
|
2017-05-31 14:01:08 +00:00
|
|
|
rb >> "query: " >> escape >> query >> "\n";
|
2017-04-18 15:44:31 +00:00
|
|
|
rb >> "hosts: " >> hosts >> "\n";
|
2017-05-30 11:49:17 +00:00
|
|
|
|
|
|
|
if (!rb.eof())
|
|
|
|
rb >> "initiator: " >> initiator >> "\n";
|
|
|
|
else
|
|
|
|
initiator.clear();
|
2017-04-17 17:04:31 +00:00
|
|
|
|
|
|
|
assertEOF(rb);
|
|
|
|
}
|
|
|
|
};
|
2017-04-13 13:42:29 +00:00
|
|
|
|
|
|
|
|
2017-05-30 11:49:17 +00:00
|
|
|
using ShardAndHostNum = std::experimental::optional<std::pair<size_t, size_t>>;
|
|
|
|
static ShardAndHostNum tryGetShardAndHostNum(const Cluster::AddressesWithFailover & cluster, const String & host_name, UInt16 port)
|
2017-04-25 15:21:03 +00:00
|
|
|
{
|
|
|
|
for (size_t shard_num = 0; shard_num < cluster.size(); ++shard_num)
|
|
|
|
{
|
|
|
|
for (size_t host_num = 0; host_num < cluster[shard_num].size(); ++host_num)
|
|
|
|
{
|
|
|
|
const Cluster::Address & address = cluster[shard_num][host_num];
|
|
|
|
if (address.host_name == host_name && address.port == port)
|
2017-05-30 11:49:17 +00:00
|
|
|
return std::make_pair(shard_num, host_num);
|
2017-04-25 15:21:03 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-05-30 11:49:17 +00:00
|
|
|
return {};
|
2017-04-25 15:21:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
static bool isSupportedAlterType(int type)
|
|
|
|
{
|
|
|
|
static const std::unordered_set<int> supported_alter_types{
|
|
|
|
ASTAlterQuery::ADD_COLUMN,
|
|
|
|
ASTAlterQuery::DROP_COLUMN,
|
|
|
|
ASTAlterQuery::MODIFY_COLUMN,
|
|
|
|
ASTAlterQuery::MODIFY_PRIMARY_KEY,
|
|
|
|
ASTAlterQuery::DROP_PARTITION
|
|
|
|
};
|
|
|
|
|
|
|
|
return supported_alter_types.count(type);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2017-04-13 16:12:56 +00:00
|
|
|
DDLWorker::DDLWorker(const std::string & zk_root_dir, Context & context_)
|
2017-05-31 14:01:08 +00:00
|
|
|
: context(context_)
|
2017-04-13 16:12:56 +00:00
|
|
|
{
|
2017-04-27 15:19:11 +00:00
|
|
|
queue_dir = zk_root_dir;
|
|
|
|
if (queue_dir.back() == '/')
|
|
|
|
queue_dir.resize(queue_dir.size() - 1);
|
2017-04-13 13:42:29 +00:00
|
|
|
|
2017-04-25 15:21:03 +00:00
|
|
|
host_name = getFQDNOrHostName();
|
|
|
|
port = context.getTCPPort();
|
2017-05-30 11:49:17 +00:00
|
|
|
host_id = Cluster::Address::toString(host_name, port);
|
2017-04-13 13:42:29 +00:00
|
|
|
|
2017-04-17 17:04:31 +00:00
|
|
|
event_queue_updated = std::make_shared<Poco::Event>();
|
|
|
|
|
2017-04-13 13:42:29 +00:00
|
|
|
thread = std::thread(&DDLWorker::run, this);
|
|
|
|
}
|
|
|
|
|
2017-04-13 16:12:56 +00:00
|
|
|
|
2017-04-13 13:42:29 +00:00
|
|
|
DDLWorker::~DDLWorker()
|
|
|
|
{
|
|
|
|
stop_flag = true;
|
2017-04-18 15:44:31 +00:00
|
|
|
event_queue_updated->set();
|
2017-04-13 13:42:29 +00:00
|
|
|
thread.join();
|
|
|
|
}
|
|
|
|
|
2017-04-13 16:12:56 +00:00
|
|
|
|
2017-04-13 13:42:29 +00:00
|
|
|
void DDLWorker::processTasks()
|
|
|
|
{
|
2017-04-27 15:19:11 +00:00
|
|
|
LOG_DEBUG(log, "Processing tasks");
|
2017-04-13 16:12:56 +00:00
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
Strings queue_nodes = zookeeper->getChildren(queue_dir, nullptr, event_queue_updated);
|
2017-04-18 15:44:31 +00:00
|
|
|
if (queue_nodes.empty())
|
2017-04-13 16:12:56 +00:00
|
|
|
return;
|
|
|
|
|
2017-04-17 17:04:31 +00:00
|
|
|
bool server_startup = last_processed_node_name.empty();
|
2017-04-13 16:12:56 +00:00
|
|
|
|
2017-04-17 17:04:31 +00:00
|
|
|
std::sort(queue_nodes.begin(), queue_nodes.end());
|
|
|
|
auto begin_node = server_startup
|
|
|
|
? queue_nodes.begin()
|
|
|
|
: std::upper_bound(queue_nodes.begin(), queue_nodes.end(), last_processed_node_name);
|
2017-04-13 16:12:56 +00:00
|
|
|
|
2017-04-17 17:04:31 +00:00
|
|
|
for (auto it = begin_node; it != queue_nodes.end(); ++it)
|
2017-04-13 16:12:56 +00:00
|
|
|
{
|
2017-05-31 14:01:08 +00:00
|
|
|
const String & node_name = *it;
|
|
|
|
String node_path = queue_dir + "/" + node_name;
|
|
|
|
String node_data;
|
2017-04-27 15:19:11 +00:00
|
|
|
|
2017-04-18 15:44:31 +00:00
|
|
|
if (!zookeeper->tryGet(node_path, node_data))
|
|
|
|
{
|
|
|
|
/// It is Ok that node could be deleted just now. It means that there are no current host in node's host list.
|
|
|
|
continue;
|
|
|
|
}
|
2017-04-17 17:04:31 +00:00
|
|
|
|
|
|
|
DDLLogEntry node;
|
|
|
|
node.parse(node_data);
|
|
|
|
|
2017-04-25 15:21:03 +00:00
|
|
|
bool host_in_hostlist = std::find(node.hosts.cbegin(), node.hosts.cend(), host_id) != node.hosts.cend();
|
2017-04-27 15:19:11 +00:00
|
|
|
bool already_processed = zookeeper->exists(node_path + "/finished/" + host_id);
|
2017-04-17 17:04:31 +00:00
|
|
|
|
|
|
|
if (!server_startup && already_processed)
|
|
|
|
{
|
|
|
|
throw Exception(
|
|
|
|
"Server expects that DDL node " + node_name + " should be processed, but it was already processed according to ZK",
|
|
|
|
ErrorCodes::LOGICAL_ERROR);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (host_in_hostlist && !already_processed)
|
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
|
|
|
processTask(node, node_name);
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
2017-04-27 15:19:11 +00:00
|
|
|
tryLogCurrentException(log, "An error occurred while processing node " + node_name + " (" + node.query + ")");
|
2017-04-17 17:04:31 +00:00
|
|
|
throw;
|
|
|
|
}
|
|
|
|
}
|
2017-04-27 15:19:11 +00:00
|
|
|
else
|
|
|
|
{
|
|
|
|
LOG_DEBUG(log, "Node " << node_name << " (" << node.query << ") will not be processed");
|
|
|
|
}
|
2017-04-17 17:04:31 +00:00
|
|
|
|
|
|
|
last_processed_node_name = node_name;
|
2017-04-13 16:12:56 +00:00
|
|
|
}
|
2017-04-13 13:42:29 +00:00
|
|
|
}
|
|
|
|
|
2017-04-25 15:21:03 +00:00
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
static bool tryExecuteQuery(const String & query, Context & context, ExecutionStatus & status, Logger * log = nullptr)
|
2017-04-25 15:21:03 +00:00
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
|
|
|
executeQuery(query, context);
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
2017-04-27 15:19:11 +00:00
|
|
|
status = ExecutionStatus::fromCurrentException();
|
|
|
|
|
2017-04-25 15:21:03 +00:00
|
|
|
if (log)
|
|
|
|
tryLogCurrentException(log, "Query " + query + " wasn't finished successfully");
|
|
|
|
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
status = ExecutionStatus(0);
|
2017-04-25 15:21:03 +00:00
|
|
|
if (log)
|
|
|
|
LOG_DEBUG(log, "Executed query: " << query);
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
void DDLWorker::processTask(const DDLLogEntry & node, const std::string & node_name)
|
2017-04-25 15:21:03 +00:00
|
|
|
{
|
2017-04-27 15:19:11 +00:00
|
|
|
LOG_DEBUG(log, "Processing node " << node_name << " (" << node.query << ")");
|
2017-04-25 15:21:03 +00:00
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
String node_path = queue_dir + "/" + node_name;
|
2017-04-25 15:21:03 +00:00
|
|
|
createStatusDirs(node_path);
|
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
bool should_not_execute = current_node == node_name && current_node_was_executed;
|
2017-04-25 15:21:03 +00:00
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
if (!should_not_execute)
|
|
|
|
{
|
|
|
|
current_node = node_name;
|
|
|
|
current_node_was_executed = false;
|
2017-04-25 15:21:03 +00:00
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
zookeeper->create(node_path + "/active/" + host_id, "", zkutil::CreateMode::Ephemeral);
|
2017-04-25 15:21:03 +00:00
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
try
|
|
|
|
{
|
|
|
|
ASTPtr query_ast;
|
|
|
|
{
|
2017-07-10 03:41:02 +00:00
|
|
|
const char * begin = &node.query.front();
|
2017-07-12 02:40:28 +00:00
|
|
|
ParserQuery parser_query(begin + node.query.size());
|
|
|
|
String description;
|
2017-04-27 15:19:11 +00:00
|
|
|
query_ast = parseQuery(parser_query, begin, begin + node.query.size(), description);
|
|
|
|
}
|
2017-04-25 15:21:03 +00:00
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
const ASTQueryWithOnCluster * query;
|
|
|
|
if (!query_ast || !(query = dynamic_cast<const ASTQueryWithOnCluster *>(query_ast.get())))
|
|
|
|
throw Exception("Recieved unsupported DDL query", ErrorCodes::NOT_IMPLEMENTED);
|
|
|
|
|
|
|
|
String cluster_name = query->cluster;
|
|
|
|
auto cluster = context.getCluster(cluster_name);
|
|
|
|
|
2017-08-11 15:02:07 +00:00
|
|
|
auto shard_host_num = tryGetShardAndHostNum(cluster->getShardsAddresses(), host_name, port);
|
2017-05-30 11:49:17 +00:00
|
|
|
if (!shard_host_num)
|
2017-04-27 15:19:11 +00:00
|
|
|
{
|
|
|
|
throw Exception("Cannot find own address (" + host_id + ") in cluster " + cluster_name + " configuration",
|
|
|
|
ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
|
|
|
|
}
|
2017-04-25 15:21:03 +00:00
|
|
|
|
2017-05-30 11:49:17 +00:00
|
|
|
size_t shard_num = shard_host_num->first;
|
|
|
|
size_t host_num = shard_host_num->second;
|
|
|
|
|
2017-08-11 15:02:07 +00:00
|
|
|
const auto & host_address = cluster->getShardsAddresses().at(shard_num).at(host_num);
|
2017-04-27 15:19:11 +00:00
|
|
|
ASTPtr rewritten_ast = query->getRewrittenASTWithoutOnCluster(host_address.default_database);
|
|
|
|
String rewritten_query = queryToString(rewritten_ast);
|
2017-04-25 15:21:03 +00:00
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
LOG_DEBUG(log, "Executing query: " << rewritten_query);
|
2017-04-25 15:21:03 +00:00
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
if (auto query_alter = dynamic_cast<const ASTAlterQuery *>(rewritten_ast.get()))
|
|
|
|
{
|
|
|
|
processTaskAlter(query_alter, rewritten_query, cluster, shard_num, node_path);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
tryExecuteQuery(rewritten_query, context, current_node_execution_status, log);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
catch (const zkutil::KeeperException & e)
|
|
|
|
{
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
current_node_execution_status = ExecutionStatus::fromCurrentException("An error occured during query preparation");
|
|
|
|
}
|
|
|
|
|
|
|
|
/// We need to distinguish ZK errors occured before and after query executing
|
|
|
|
current_node_was_executed = true;
|
2017-04-25 15:21:03 +00:00
|
|
|
}
|
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
/// Delete active flag and create finish flag
|
|
|
|
zkutil::Ops ops;
|
|
|
|
ops.emplace_back(std::make_unique<zkutil::Op::Remove>(node_path + "/active/" + host_id, -1));
|
|
|
|
ops.emplace_back(std::make_unique<zkutil::Op::Create>(node_path + "/finished/" + host_id,
|
|
|
|
current_node_execution_status.serializeText(), zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
|
2017-04-25 15:21:03 +00:00
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
int code = zookeeper->tryMultiWithRetries(ops);
|
|
|
|
if (code != ZOK && code != ZNONODE)
|
|
|
|
throw zkutil::KeeperException("Cannot commit executed node " + node_name, code);
|
|
|
|
}
|
2017-04-25 15:21:03 +00:00
|
|
|
|
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
void DDLWorker::processTaskAlter(
|
|
|
|
const ASTAlterQuery * query_alter,
|
|
|
|
const String & rewritten_query,
|
|
|
|
const std::shared_ptr<Cluster> & cluster,
|
|
|
|
ssize_t shard_num,
|
|
|
|
const String & node_path)
|
|
|
|
{
|
|
|
|
String database = query_alter->database.empty() ? context.getCurrentDatabase() : query_alter->database;
|
|
|
|
StoragePtr storage = context.getTable(database, query_alter->table);
|
2017-04-25 15:21:03 +00:00
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
bool execute_once_on_replica = storage->supportsReplication();
|
|
|
|
bool execute_on_leader_replica = false;
|
2017-04-25 15:21:03 +00:00
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
for (const auto & param : query_alter->parameters)
|
2017-04-25 15:21:03 +00:00
|
|
|
{
|
2017-04-27 15:19:11 +00:00
|
|
|
if (!isSupportedAlterType(param.type))
|
|
|
|
throw Exception("Unsupported type of ALTER query", ErrorCodes::NOT_IMPLEMENTED);
|
2017-04-25 15:21:03 +00:00
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
if (execute_once_on_replica)
|
|
|
|
execute_on_leader_replica |= param.type == ASTAlterQuery::DROP_PARTITION;
|
|
|
|
}
|
2017-04-25 15:21:03 +00:00
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
const auto & shard_info = cluster->getShardsInfo().at(shard_num);
|
|
|
|
bool config_is_replicated_shard = shard_info.hasInternalReplication();
|
2017-04-25 15:21:03 +00:00
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
if (execute_once_on_replica && !config_is_replicated_shard)
|
|
|
|
{
|
|
|
|
throw Exception("Table " + query_alter->table + " is replicated, but shard #" + toString(shard_num + 1) +
|
|
|
|
" isn't replicated according to its cluster definition", ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
|
|
|
|
}
|
|
|
|
else if (!execute_once_on_replica && config_is_replicated_shard)
|
|
|
|
{
|
|
|
|
throw Exception("Table " + query_alter->table + " isn't replicated, but shard #" + toString(shard_num + 1) +
|
|
|
|
" replicated according to its cluster definition", ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
|
|
|
|
}
|
2017-04-25 15:21:03 +00:00
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
if (execute_once_on_replica)
|
|
|
|
{
|
2017-05-14 21:57:30 +00:00
|
|
|
/// The following code may perform ALTER twice if
|
|
|
|
/// current secver aquires lock, executes replicated alter,
|
|
|
|
/// losts zookeeper connection and doesn't have time to create /executed node, second server executes replicated alter again
|
|
|
|
/// To avoid this problem alter() method of replicated tables should be changed and takes into account ddl query id tag.
|
2017-05-30 11:49:17 +00:00
|
|
|
if (!context.getSettingsRef().distributed_ddl_allow_replicated_alter)
|
|
|
|
throw Exception("Distributed DDL alters don't work properly yet", ErrorCodes::NOT_IMPLEMENTED);
|
2017-05-14 21:57:30 +00:00
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
Strings replica_names;
|
2017-08-11 15:02:07 +00:00
|
|
|
for (const auto & address : cluster->getShardsAddresses().at(shard_num))
|
2017-04-27 15:19:11 +00:00
|
|
|
replica_names.emplace_back(address.toString());
|
|
|
|
std::sort(replica_names.begin(), replica_names.end());
|
2017-04-25 15:21:03 +00:00
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
String shard_dir_name;
|
|
|
|
for (auto it = replica_names.begin(); it != replica_names.end(); ++it)
|
|
|
|
shard_dir_name += *it + (std::next(it) != replica_names.end() ? "," : "");
|
2017-04-25 15:21:03 +00:00
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
String shard_path = node_path + "/shards/" + shard_dir_name;
|
|
|
|
String is_executed_path = shard_path + "/executed";
|
|
|
|
zookeeper->createAncestors(shard_path + "/");
|
2017-04-25 15:21:03 +00:00
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
bool alter_executed_by_replica = false;
|
|
|
|
{
|
|
|
|
auto zookeeper_holder = std::make_shared<zkutil::ZooKeeperHolder>();
|
|
|
|
zookeeper_holder->initFromInstance(zookeeper);
|
2017-04-25 15:21:03 +00:00
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
zkutil::Lock lock(zookeeper_holder, shard_path, "lock", host_id);
|
|
|
|
std::mt19937 rng(std::hash<String>{}(host_id) + reinterpret_cast<intptr_t>(&rng));
|
|
|
|
|
|
|
|
for (int num_tries = 0; num_tries < 10; ++num_tries)
|
2017-04-25 15:21:03 +00:00
|
|
|
{
|
2017-04-27 15:19:11 +00:00
|
|
|
if (zookeeper->exists(is_executed_path))
|
|
|
|
{
|
|
|
|
alter_executed_by_replica = true;
|
|
|
|
break;
|
|
|
|
}
|
2017-04-25 15:21:03 +00:00
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
if (lock.tryLock())
|
2017-04-25 15:21:03 +00:00
|
|
|
{
|
2017-04-27 15:19:11 +00:00
|
|
|
tryExecuteQuery(rewritten_query, context, current_node_execution_status, log);
|
2017-04-25 15:21:03 +00:00
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
if (execute_on_leader_replica && current_node_execution_status.code == ErrorCodes::NOT_IMPLEMENTED)
|
2017-04-25 15:21:03 +00:00
|
|
|
{
|
2017-04-27 15:19:11 +00:00
|
|
|
/// TODO: it is ok to recieve exception "host is not leader"
|
2017-04-25 15:21:03 +00:00
|
|
|
}
|
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
zookeeper->create(is_executed_path, host_id, zkutil::CreateMode::Persistent);
|
|
|
|
lock.unlock();
|
|
|
|
alter_executed_by_replica = true;
|
|
|
|
break;
|
2017-04-25 15:21:03 +00:00
|
|
|
}
|
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
std::this_thread::sleep_for(std::chrono::duration<double>(std::uniform_real_distribution<double>(0, 1)(rng)));
|
2017-04-25 15:21:03 +00:00
|
|
|
}
|
|
|
|
}
|
2017-04-27 15:19:11 +00:00
|
|
|
|
|
|
|
if (!alter_executed_by_replica)
|
|
|
|
current_node_execution_status = ExecutionStatus(ErrorCodes::NOT_IMPLEMENTED, "Cannot enqueue replicated DDL query");
|
2017-04-25 15:21:03 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2017-04-27 15:19:11 +00:00
|
|
|
tryExecuteQuery(rewritten_query, context, current_node_execution_status, log);
|
2017-04-25 15:21:03 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2017-04-19 14:21:27 +00:00
|
|
|
void DDLWorker::cleanupQueue(const Strings * node_names_to_check)
|
|
|
|
{
|
|
|
|
/// Both ZK and Poco use Unix epoch
|
|
|
|
size_t current_time_seconds = Poco::Timestamp().epochTime();
|
|
|
|
constexpr size_t zookeeper_time_resolution = 1000;
|
|
|
|
|
|
|
|
// Too early to check
|
2017-04-27 15:19:11 +00:00
|
|
|
if (last_cleanup_time_seconds && current_time_seconds < last_cleanup_time_seconds + cleanup_min_period_seconds)
|
2017-04-19 14:21:27 +00:00
|
|
|
return;
|
|
|
|
|
|
|
|
last_cleanup_time_seconds = current_time_seconds;
|
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
LOG_DEBUG(log, "Cleaning queue");
|
|
|
|
|
2017-04-19 14:21:27 +00:00
|
|
|
String data;
|
|
|
|
zkutil::Stat stat;
|
|
|
|
DDLLogEntry node;
|
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
Strings node_names_fetched = node_names_to_check ? Strings{} : zookeeper->getChildren(queue_dir);
|
2017-04-19 14:21:27 +00:00
|
|
|
const Strings & node_names = (node_names_to_check) ? *node_names_to_check : node_names_fetched;
|
|
|
|
|
|
|
|
for (const String & node_name : node_names)
|
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
2017-04-27 15:19:11 +00:00
|
|
|
String node_path = queue_dir + "/" + node_name;
|
2017-04-19 14:21:27 +00:00
|
|
|
if (!zookeeper->tryGet(node_path, data, &stat))
|
|
|
|
continue;
|
|
|
|
|
2017-04-25 15:21:03 +00:00
|
|
|
/// TODO: Add shared lock to avoid rare race counditions.
|
2017-04-19 14:21:27 +00:00
|
|
|
|
|
|
|
size_t zookeeper_time_seconds = stat.mtime / zookeeper_time_resolution;
|
|
|
|
if (zookeeper_time_seconds + node_max_lifetime_seconds < current_time_seconds)
|
|
|
|
{
|
|
|
|
size_t lifetime_seconds = current_time_seconds - zookeeper_time_seconds;
|
|
|
|
LOG_INFO(log, "Lifetime of node " << node_name << " (" << lifetime_seconds << " sec.) is expired, deleting it");
|
|
|
|
zookeeper->removeRecursive(node_path);
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
Strings finished_nodes = zookeeper->getChildren(node_path + "/finished");
|
2017-04-25 15:21:03 +00:00
|
|
|
node.parse(data);
|
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
if (finished_nodes.size() >= node.hosts.size())
|
2017-04-19 14:21:27 +00:00
|
|
|
{
|
|
|
|
LOG_INFO(log, "Node " << node_name << " had been executed by each host, deleting it");
|
|
|
|
zookeeper->removeRecursive(node_path);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(log, "An error occured while checking and cleaning node " + node_name + " from queue");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-04-13 16:12:56 +00:00
|
|
|
|
2017-04-17 17:04:31 +00:00
|
|
|
/// Try to create unexisting "status" dirs for a node
|
|
|
|
void DDLWorker::createStatusDirs(const std::string & node_path)
|
2017-04-13 13:42:29 +00:00
|
|
|
{
|
2017-04-17 17:04:31 +00:00
|
|
|
zkutil::Ops ops;
|
2017-04-27 15:19:11 +00:00
|
|
|
auto acl = zookeeper->getDefaultACL();
|
2017-04-17 17:04:31 +00:00
|
|
|
ops.emplace_back(std::make_unique<zkutil::Op::Create>(node_path + "/active", "", acl, zkutil::CreateMode::Persistent));
|
2017-04-27 15:19:11 +00:00
|
|
|
ops.emplace_back(std::make_unique<zkutil::Op::Create>(node_path + "/finished", "", acl, zkutil::CreateMode::Persistent));
|
2017-04-13 13:42:29 +00:00
|
|
|
|
2017-04-17 17:04:31 +00:00
|
|
|
int code = zookeeper->tryMulti(ops);
|
|
|
|
if (code != ZOK && code != ZNODEEXISTS)
|
|
|
|
throw zkutil::KeeperException(code);
|
|
|
|
}
|
2017-04-13 13:42:29 +00:00
|
|
|
|
|
|
|
|
2017-04-18 15:44:31 +00:00
|
|
|
String DDLWorker::enqueueQuery(DDLLogEntry & entry)
|
2017-04-13 16:12:56 +00:00
|
|
|
{
|
2017-04-17 17:04:31 +00:00
|
|
|
if (entry.hosts.empty())
|
2017-04-18 15:44:31 +00:00
|
|
|
return {};
|
2017-04-13 16:12:56 +00:00
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
String query_path_prefix = queue_dir + "/query-";
|
2017-04-17 17:04:31 +00:00
|
|
|
zookeeper->createAncestors(query_path_prefix);
|
2017-04-13 16:12:56 +00:00
|
|
|
|
2017-04-17 17:04:31 +00:00
|
|
|
String node_path = zookeeper->create(query_path_prefix, entry.toString(), zkutil::CreateMode::PersistentSequential);
|
|
|
|
createStatusDirs(node_path);
|
2017-04-18 15:44:31 +00:00
|
|
|
|
|
|
|
return node_path;
|
2017-04-13 13:42:29 +00:00
|
|
|
}
|
|
|
|
|
2017-04-13 16:12:56 +00:00
|
|
|
|
2017-04-13 13:42:29 +00:00
|
|
|
void DDLWorker::run()
|
|
|
|
{
|
2017-04-27 15:19:11 +00:00
|
|
|
setThreadName("DDLWorker");
|
2017-04-18 15:44:31 +00:00
|
|
|
LOG_DEBUG(log, "Started DDLWorker thread");
|
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
zookeeper = context.getZooKeeper();
|
|
|
|
zookeeper->createAncestors(queue_dir + "/");
|
|
|
|
|
2017-04-13 13:42:29 +00:00
|
|
|
while (!stop_flag)
|
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
2017-04-25 15:21:03 +00:00
|
|
|
processTasks();
|
2017-04-13 13:42:29 +00:00
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
LOG_DEBUG(log, "Waiting watch");
|
|
|
|
event_queue_updated->wait();
|
|
|
|
|
|
|
|
if (stop_flag)
|
|
|
|
break;
|
2017-04-25 15:21:03 +00:00
|
|
|
|
|
|
|
cleanupQueue();
|
2017-04-19 14:21:27 +00:00
|
|
|
}
|
2017-04-27 15:19:11 +00:00
|
|
|
catch (zkutil::KeeperException &)
|
|
|
|
{
|
|
|
|
LOG_DEBUG(log, "Recovering ZooKeeper session");
|
|
|
|
zookeeper = context.getZooKeeper();
|
|
|
|
}
|
2017-04-19 14:21:27 +00:00
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(log);
|
2017-04-27 15:19:11 +00:00
|
|
|
throw;
|
2017-04-19 14:21:27 +00:00
|
|
|
}
|
2017-04-13 13:42:29 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-04-13 16:12:56 +00:00
|
|
|
|
2017-04-18 15:44:31 +00:00
|
|
|
class DDLQueryStatusInputSream : public IProfilingBlockInputStream
|
2017-04-13 16:12:56 +00:00
|
|
|
{
|
2017-04-18 15:44:31 +00:00
|
|
|
public:
|
2017-04-19 14:21:27 +00:00
|
|
|
|
2017-04-18 15:44:31 +00:00
|
|
|
DDLQueryStatusInputSream(const String & zk_node_path, Context & context, size_t num_hosts)
|
2017-04-27 15:19:11 +00:00
|
|
|
: node_path(zk_node_path), context(context), watch(CLOCK_MONOTONIC_COARSE)
|
2017-04-18 15:44:31 +00:00
|
|
|
{
|
|
|
|
sample = Block{
|
|
|
|
{std::make_shared<DataTypeString>(), "host"},
|
|
|
|
{std::make_shared<DataTypeUInt64>(), "status"},
|
|
|
|
{std::make_shared<DataTypeString>(), "error"},
|
|
|
|
{std::make_shared<DataTypeUInt64>(), "num_hosts_remaining"},
|
|
|
|
{std::make_shared<DataTypeUInt64>(), "num_hosts_active"},
|
|
|
|
};
|
|
|
|
|
|
|
|
setTotalRowsApprox(num_hosts);
|
|
|
|
}
|
|
|
|
|
|
|
|
String getName() const override
|
|
|
|
{
|
|
|
|
return "DDLQueryStatusInputSream";
|
|
|
|
}
|
|
|
|
|
|
|
|
String getID() const override
|
|
|
|
{
|
|
|
|
return "DDLQueryStatusInputSream(" + node_path + ")";
|
|
|
|
}
|
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
static constexpr size_t timeout_seconds = 120;
|
|
|
|
|
2017-04-18 15:44:31 +00:00
|
|
|
Block readImpl() override
|
|
|
|
{
|
|
|
|
Block res;
|
|
|
|
if (num_hosts_finished >= total_rows_approx)
|
|
|
|
return res;
|
|
|
|
|
|
|
|
auto zookeeper = context.getZooKeeper();
|
|
|
|
size_t try_number = 0;
|
|
|
|
|
|
|
|
while(res.rows() == 0)
|
|
|
|
{
|
|
|
|
if (is_cancelled)
|
|
|
|
return res;
|
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
auto elapsed_seconds = watch.elapsedSeconds();
|
|
|
|
if (elapsed_seconds > timeout_seconds)
|
2017-05-30 11:49:17 +00:00
|
|
|
throw Exception("Watching query is executing too long (" + toString(std::round(elapsed_seconds)) + " sec.)", ErrorCodes::TIMEOUT_EXCEEDED);
|
2017-04-27 15:19:11 +00:00
|
|
|
|
2017-04-18 15:44:31 +00:00
|
|
|
if (num_hosts_finished != 0 || try_number != 0)
|
2017-04-21 12:39:28 +00:00
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(50 * std::min(20LU, try_number + 1)));
|
2017-04-18 15:44:31 +00:00
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
/// TODO: add shared lock
|
2017-04-18 15:44:31 +00:00
|
|
|
if (!zookeeper->exists(node_path))
|
2017-04-21 12:39:28 +00:00
|
|
|
{
|
|
|
|
throw Exception("Cannot provide query execution status. The query's node " + node_path
|
|
|
|
+ " had been deleted by cleaner since it was finished (or its lifetime is expired)",
|
|
|
|
ErrorCodes::UNFINISHED);
|
|
|
|
}
|
2017-04-18 15:44:31 +00:00
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
Strings new_hosts = getNewAndUpdate(finished_hosts_set, getChildrenAllowNoNode(zookeeper, node_path + "/finished"));
|
2017-04-18 15:44:31 +00:00
|
|
|
++try_number;
|
|
|
|
if (new_hosts.empty())
|
|
|
|
continue;
|
|
|
|
|
2017-04-21 12:39:28 +00:00
|
|
|
Strings cur_active_hosts = getChildrenAllowNoNode(zookeeper, node_path + "/active");
|
|
|
|
|
2017-04-18 15:44:31 +00:00
|
|
|
res = sample.cloneEmpty();
|
2017-04-27 15:19:11 +00:00
|
|
|
for (const String & host : new_hosts)
|
2017-04-18 15:44:31 +00:00
|
|
|
{
|
2017-04-27 15:19:11 +00:00
|
|
|
ExecutionStatus status(1, "Cannot obtain error message");
|
|
|
|
{
|
|
|
|
String status_data;
|
|
|
|
if (zookeeper->tryGet(node_path + "/finished/" + host, status_data))
|
|
|
|
status.deserializeText(status_data);
|
|
|
|
}
|
|
|
|
|
|
|
|
res.getByName("host").column->insert(host);
|
|
|
|
res.getByName("status").column->insert(static_cast<UInt64>(status.code));
|
|
|
|
res.getByName("error").column->insert(status.message);
|
2017-04-18 15:44:31 +00:00
|
|
|
res.getByName("num_hosts_remaining").column->insert(total_rows_approx - (++num_hosts_finished));
|
2017-04-21 12:39:28 +00:00
|
|
|
res.getByName("num_hosts_active").column->insert(cur_active_hosts.size());
|
2017-04-18 15:44:31 +00:00
|
|
|
}
|
|
|
|
}
|
2017-04-13 16:12:56 +00:00
|
|
|
|
2017-04-18 15:44:31 +00:00
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
2017-04-21 12:39:28 +00:00
|
|
|
static Strings getChildrenAllowNoNode(const std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & node_path)
|
|
|
|
{
|
|
|
|
Strings res;
|
|
|
|
int code = zookeeper->tryGetChildren(node_path, res);
|
|
|
|
if (code != ZOK && code != ZNONODE)
|
|
|
|
throw zkutil::KeeperException(code, node_path);
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
|
|
|
static Strings getNewAndUpdate(NameSet & prev, const Strings & cur_list)
|
2017-04-18 15:44:31 +00:00
|
|
|
{
|
|
|
|
Strings diff;
|
|
|
|
for (const String & elem : cur_list)
|
|
|
|
{
|
|
|
|
if (!prev.count(elem))
|
2017-04-19 14:21:27 +00:00
|
|
|
{
|
2017-04-18 15:44:31 +00:00
|
|
|
diff.emplace_back(elem);
|
2017-04-19 14:21:27 +00:00
|
|
|
prev.emplace(elem);
|
|
|
|
}
|
2017-04-18 15:44:31 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return diff;
|
|
|
|
}
|
|
|
|
|
|
|
|
~DDLQueryStatusInputSream() override = default;
|
|
|
|
|
|
|
|
Block sample;
|
|
|
|
|
|
|
|
private:
|
|
|
|
String node_path;
|
|
|
|
Context & context;
|
|
|
|
|
2017-04-27 15:19:11 +00:00
|
|
|
Stopwatch watch;
|
|
|
|
|
|
|
|
NameSet finished_hosts_set;
|
2017-04-18 15:44:31 +00:00
|
|
|
size_t num_hosts_finished = 0;
|
2017-04-17 17:04:31 +00:00
|
|
|
};
|
2017-04-13 16:12:56 +00:00
|
|
|
|
|
|
|
|
2017-04-25 15:21:03 +00:00
|
|
|
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, Context & context)
|
2017-04-13 16:12:56 +00:00
|
|
|
{
|
2017-04-25 15:21:03 +00:00
|
|
|
const auto query = dynamic_cast<const ASTQueryWithOnCluster *>(query_ptr.get());
|
|
|
|
if (!query)
|
|
|
|
{
|
|
|
|
throw Exception("Distributed execution is not supported for such DDL queries",
|
|
|
|
ErrorCodes::NOT_IMPLEMENTED);
|
|
|
|
}
|
2017-04-17 17:04:31 +00:00
|
|
|
|
2017-04-25 15:21:03 +00:00
|
|
|
auto query_alter = dynamic_cast<const ASTAlterQuery *>(query_ptr.get());
|
|
|
|
if (query_alter)
|
|
|
|
{
|
|
|
|
for (const auto & param : query_alter->parameters)
|
|
|
|
{
|
|
|
|
if (!isSupportedAlterType(param.type))
|
|
|
|
throw Exception("Unsupported type of ALTER query", ErrorCodes::NOT_IMPLEMENTED);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
ClusterPtr cluster = context.getCluster(query->cluster);
|
|
|
|
DDLWorker & ddl_worker = context.getDDLWorker();
|
2017-04-21 12:39:28 +00:00
|
|
|
|
2017-04-17 17:04:31 +00:00
|
|
|
DDLLogEntry entry;
|
2017-04-25 15:21:03 +00:00
|
|
|
entry.query = queryToString(query_ptr);
|
2017-04-17 17:04:31 +00:00
|
|
|
entry.initiator = ddl_worker.getHostName();
|
|
|
|
|
2017-08-11 15:02:07 +00:00
|
|
|
Cluster::AddressesWithFailover shards = cluster->getShardsAddresses();
|
2017-04-13 16:12:56 +00:00
|
|
|
for (const auto & shard : shards)
|
2017-04-25 15:21:03 +00:00
|
|
|
{
|
2017-04-13 16:12:56 +00:00
|
|
|
for (const auto & addr : shard)
|
2017-04-17 17:04:31 +00:00
|
|
|
entry.hosts.emplace_back(addr.toString());
|
2017-04-25 15:21:03 +00:00
|
|
|
}
|
2017-04-13 16:12:56 +00:00
|
|
|
|
2017-04-18 15:44:31 +00:00
|
|
|
String node_path = ddl_worker.enqueueQuery(entry);
|
2017-04-13 16:12:56 +00:00
|
|
|
|
|
|
|
BlockIO io;
|
2017-04-18 15:44:31 +00:00
|
|
|
if (node_path.empty())
|
|
|
|
return io;
|
|
|
|
|
|
|
|
auto stream = std::make_shared<DDLQueryStatusInputSream>(node_path, context, entry.hosts.size());
|
|
|
|
io.in_sample = stream->sample.cloneEmpty();
|
|
|
|
io.in = std::move(stream);
|
2017-04-13 16:12:56 +00:00
|
|
|
return io;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2017-04-13 13:42:29 +00:00
|
|
|
}
|