mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 01:51:59 +00:00
Workable version for CREATE DROP w/o ZK tests. [#CLICKHOUSE-5]
This commit is contained in:
parent
aa7504a7f5
commit
209015574f
@ -83,28 +83,7 @@ std::string getCurrentExceptionMessage(bool with_stacktrace, bool check_embedded
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
try
|
||||
{
|
||||
std::string text = e.displayText();
|
||||
|
||||
bool has_embedded_stack_trace = false;
|
||||
if (check_embedded_stacktrace)
|
||||
{
|
||||
auto embedded_stack_trace_pos = text.find("Stack trace");
|
||||
has_embedded_stack_trace = embedded_stack_trace_pos != std::string::npos;
|
||||
if (!with_stacktrace && has_embedded_stack_trace)
|
||||
{
|
||||
text.resize(embedded_stack_trace_pos);
|
||||
Poco::trimRightInPlace(text);
|
||||
}
|
||||
}
|
||||
|
||||
stream << "Code: " << e.code() << ", e.displayText() = " << text << ", e.what() = " << e.what();
|
||||
|
||||
if (with_stacktrace && !has_embedded_stack_trace)
|
||||
stream << ", Stack trace:\n\n" << e.getStackTrace().toString();
|
||||
}
|
||||
catch (...) {}
|
||||
stream << getExceptionMessage(e, with_stacktrace, check_embedded_stacktrace);
|
||||
}
|
||||
catch (const Poco::Exception & e)
|
||||
{
|
||||
@ -230,6 +209,36 @@ void tryLogException(std::exception_ptr e, Poco::Logger * logger, const std::str
|
||||
}
|
||||
}
|
||||
|
||||
std::string getExceptionMessage(const Exception & e, bool with_stacktrace, bool check_embedded_stacktrace)
|
||||
{
|
||||
std::stringstream stream;
|
||||
|
||||
try
|
||||
{
|
||||
std::string text = e.displayText();
|
||||
|
||||
bool has_embedded_stack_trace = false;
|
||||
if (check_embedded_stacktrace)
|
||||
{
|
||||
auto embedded_stack_trace_pos = text.find("Stack trace");
|
||||
has_embedded_stack_trace = embedded_stack_trace_pos != std::string::npos;
|
||||
if (!with_stacktrace && has_embedded_stack_trace)
|
||||
{
|
||||
text.resize(embedded_stack_trace_pos);
|
||||
Poco::trimRightInPlace(text);
|
||||
}
|
||||
}
|
||||
|
||||
stream << "Code: " << e.code() << ", e.displayText() = " << text << ", e.what() = " << e.what();
|
||||
|
||||
if (with_stacktrace && !has_embedded_stack_trace)
|
||||
stream << ", Stack trace:\n\n" << e.getStackTrace().toString();
|
||||
}
|
||||
catch (...) {}
|
||||
|
||||
return stream.str();
|
||||
}
|
||||
|
||||
std::string getExceptionMessage(std::exception_ptr e, bool with_stacktrace)
|
||||
{
|
||||
try
|
||||
|
@ -92,6 +92,7 @@ int getCurrentExceptionCode();
|
||||
void tryLogException(std::exception_ptr e, const char * log_name, const std::string & start_of_message = "");
|
||||
void tryLogException(std::exception_ptr e, Poco::Logger * logger, const std::string & start_of_message = "");
|
||||
|
||||
std::string getExceptionMessage(const Exception & e, bool with_stacktrace, bool check_embedded_stacktrace = false);
|
||||
std::string getExceptionMessage(std::exception_ptr e, bool with_stacktrace);
|
||||
|
||||
|
||||
|
@ -22,6 +22,10 @@ struct ColumnWithTypeAndName
|
||||
ColumnWithTypeAndName(const ColumnPtr & column_, const DataTypePtr & type_, const String name_)
|
||||
: column(column_), type(type_), name(name_) {}
|
||||
|
||||
/// Uses type->createColumn() to create column
|
||||
ColumnWithTypeAndName(const DataTypePtr & type_, const String name_)
|
||||
: column(type_->createColumn()), type(type_), name(name_) {}
|
||||
|
||||
ColumnWithTypeAndName cloneEmpty() const;
|
||||
bool operator==(const ColumnWithTypeAndName & other) const;
|
||||
String prettyPrint() const;
|
||||
|
@ -25,7 +25,7 @@ struct BlockIO
|
||||
Block out_sample; /// Example of a block to be written to `out`.
|
||||
|
||||
/// Callbacks for query logging could be set here.
|
||||
std::function<void(IBlockInputStream *, IBlockOutputStream *)> finish_callback;
|
||||
std::function<void(IBlockInputStream *, IBlockOutputStream *)> finish_callback;
|
||||
std::function<void()> exception_callback;
|
||||
|
||||
/// Call these functions if you want to log the request.
|
||||
@ -44,18 +44,18 @@ struct BlockIO
|
||||
BlockIO & operator= (const BlockIO & rhs)
|
||||
{
|
||||
/// We provide the correct order of destruction.
|
||||
out = nullptr;
|
||||
in = nullptr;
|
||||
process_list_entry = nullptr;
|
||||
out = nullptr;
|
||||
in = nullptr;
|
||||
process_list_entry = nullptr;
|
||||
|
||||
process_list_entry = rhs.process_list_entry;
|
||||
in = rhs.in;
|
||||
out = rhs.out;
|
||||
in_sample = rhs.in_sample;
|
||||
out_sample = rhs.out_sample;
|
||||
process_list_entry = rhs.process_list_entry;
|
||||
in = rhs.in;
|
||||
out = rhs.out;
|
||||
in_sample = rhs.in_sample;
|
||||
out_sample = rhs.out_sample;
|
||||
|
||||
finish_callback = rhs.finish_callback;
|
||||
exception_callback = rhs.exception_callback;
|
||||
finish_callback = rhs.finish_callback;
|
||||
exception_callback = rhs.exception_callback;
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <Common/SimpleCache.h>
|
||||
#include <Common/StringUtils.h>
|
||||
#include <IO/HexWriteBuffer.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <Poco/Util/Application.h>
|
||||
#include <openssl/sha.h>
|
||||
@ -108,6 +109,11 @@ Cluster::Address::Address(const String & host_port_, const String & user_, const
|
||||
}
|
||||
}
|
||||
|
||||
String Cluster::Address::toString() const
|
||||
{
|
||||
return host_name + ':' + DB::toString(port);
|
||||
}
|
||||
|
||||
/// Implementation of Clusters class
|
||||
|
||||
Clusters::Clusters(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name)
|
||||
|
@ -58,6 +58,9 @@ public:
|
||||
|
||||
Address(Poco::Util::AbstractConfiguration & config, const String & config_prefix);
|
||||
Address(const String & host_port_, const String & user_, const String & password_);
|
||||
|
||||
/// Returns 'host_name:port'
|
||||
String toString() const;
|
||||
};
|
||||
|
||||
using Addresses = std::vector<Address>;
|
||||
|
@ -1,8 +1,22 @@
|
||||
#include <Interpreters/DDLWorker.h>
|
||||
|
||||
#include <Parsers/parseQuery.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Parsers/ASTDropQuery.h>
|
||||
#include <Parsers/ASTAlterQuery.h>
|
||||
|
||||
#include <Interpreters/executeQuery.h>
|
||||
#include <Interpreters/Cluster.h>
|
||||
#include <DataStreams/OneBlockInputStream.h>
|
||||
#include <Common/getFQDNOrHostName.h>
|
||||
|
||||
#include <Interpreters/DDLWorker.h>
|
||||
#include <Interpreters/InterpreterCreateQuery.h>
|
||||
#include <Interpreters/executeQuery.h>
|
||||
#include <Core/Block.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <Columns/ColumnArray.h>
|
||||
|
||||
#include <zkutil/ZooKeeper.h>
|
||||
|
||||
@ -19,58 +33,47 @@ namespace {
|
||||
|
||||
/// Helper class which extracts from the ClickHouse configuration file
|
||||
/// the parameters we need for operating the resharding thread.
|
||||
class Arguments final
|
||||
{
|
||||
public:
|
||||
Arguments(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
|
||||
{
|
||||
Poco::Util::AbstractConfiguration::Keys keys;
|
||||
config.keys(config_name, keys);
|
||||
|
||||
for (const auto & key : keys)
|
||||
{
|
||||
if (key == "task_queue_path")
|
||||
task_queue_path = config.getString(config_name + "." + key);
|
||||
else
|
||||
throw Exception{"Unknown parameter in resharding configuration", ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG};
|
||||
}
|
||||
|
||||
if (task_queue_path.empty())
|
||||
throw Exception{"Resharding: missing parameter task_queue_path", ErrorCodes::INVALID_CONFIG_PARAMETER};
|
||||
}
|
||||
|
||||
Arguments(const Arguments &) = delete;
|
||||
Arguments & operator=(const Arguments &) = delete;
|
||||
|
||||
std::string getTaskQueuePath() const
|
||||
{
|
||||
return task_queue_path;
|
||||
}
|
||||
|
||||
private:
|
||||
std::string task_queue_path;
|
||||
};
|
||||
// struct Arguments
|
||||
// {
|
||||
// public:
|
||||
// Arguments(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
|
||||
// {
|
||||
// Poco::Util::AbstractConfiguration::Keys keys;
|
||||
// config.keys(config_name, keys);
|
||||
//
|
||||
// for (const auto & key : keys)
|
||||
// {
|
||||
// if (key == "distributed_ddl_root")
|
||||
// ddl_queries_root = config.getString(config_name + "." + key);
|
||||
// else
|
||||
// throw Exception{"Unknown parameter in distributed DDL configuration", ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG};
|
||||
// }
|
||||
//
|
||||
// if (ddl_queries_root.empty())
|
||||
// throw Exception{"Distributed DDL: missing parameter distributed_ddl_root", ErrorCodes::INVALID_CONFIG_PARAMETER};
|
||||
// }
|
||||
//
|
||||
// std::string ddl_queries_root;
|
||||
// };
|
||||
|
||||
}
|
||||
|
||||
DDLWorker::DDLWorker(const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_name, Context & context_)
|
||||
: context(context_)
|
||||
, stop_flag(false)
|
||||
|
||||
DDLWorker::DDLWorker(const std::string & zk_root_dir, Context & context_)
|
||||
: context(context_), stop_flag(false)
|
||||
{
|
||||
Arguments arguments(config, config_name);
|
||||
auto zookeeper = context.getZooKeeper();
|
||||
root_dir = zk_root_dir;
|
||||
if (root_dir.back() == '/')
|
||||
root_dir.resize(root_dir.size() - 1);
|
||||
|
||||
std::string root = arguments.getTaskQueuePath();
|
||||
if (root.back() != '/')
|
||||
root += "/";
|
||||
|
||||
auto current_host = getFQDNOrHostName();
|
||||
host_task_queue_path = "/clickhouse/task_queue/ddl/" + current_host;
|
||||
hostname = getFQDNOrHostName() + ':' + DB::toString(context.getTCPPort());
|
||||
assign_dir = getAssignsDir() + hostname;
|
||||
master_dir = getMastersDir() + hostname;
|
||||
|
||||
thread = std::thread(&DDLWorker::run, this);
|
||||
}
|
||||
|
||||
|
||||
DDLWorker::~DDLWorker()
|
||||
{
|
||||
stop_flag = true;
|
||||
@ -78,43 +81,141 @@ DDLWorker::~DDLWorker()
|
||||
thread.join();
|
||||
}
|
||||
|
||||
void DDLWorker::processTasks()
|
||||
{
|
||||
processCreate(host_task_queue_path + "/create");
|
||||
}
|
||||
|
||||
void DDLWorker::processCreate(const std::string & path)
|
||||
void DDLWorker::processTasks()
|
||||
{
|
||||
auto zookeeper = context.getZooKeeper();
|
||||
|
||||
if (!zookeeper->exists(path))
|
||||
if (!zookeeper->exists(assign_dir))
|
||||
return;
|
||||
|
||||
const Strings & children = zookeeper->getChildren(path);
|
||||
Strings tasks = zookeeper->getChildren(assign_dir);
|
||||
if (tasks.empty())
|
||||
return;
|
||||
|
||||
for (const auto & name : children)
|
||||
String current_task = *std::min_element(tasks.cbegin(), tasks.cend());
|
||||
|
||||
try
|
||||
{
|
||||
const std::string & query_path = path + "/" + name;
|
||||
processTask(current_task);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(log, "An unexpected error occurred during task " + current_task);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
try
|
||||
|
||||
bool DDLWorker::processTask(const std::string & task)
|
||||
{
|
||||
auto zookeeper = context.getZooKeeper();
|
||||
|
||||
String query_dir = root_dir + "/" + task;
|
||||
String assign_node = assign_dir + "/" + task;
|
||||
String active_node = query_dir + "/active/" + hostname;
|
||||
String sucsess_node = query_dir + "/sucess/" + hostname;
|
||||
String fail_node = query_dir + "/failed/" + hostname;
|
||||
|
||||
String query = zookeeper->get(query_dir);
|
||||
|
||||
if (zookeeper->exists(sucsess_node) || zookeeper->exists(fail_node))
|
||||
{
|
||||
throw Exception(
|
||||
"Task " + task + " (query " + query + ") was already processed by node " + hostname, ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
/// Create active flag
|
||||
zookeeper->create(active_node, "", zkutil::CreateMode::Ephemeral);
|
||||
|
||||
/// Will delete task from host's tasks list, delete active flag and ...
|
||||
zkutil::Ops ops;
|
||||
ops.emplace_back(std::make_unique<zkutil::Op::Remove>(assign_node, -1));
|
||||
ops.emplace_back(std::make_unique<zkutil::Op::Remove>(active_node, -1));
|
||||
|
||||
try
|
||||
{
|
||||
executeQuery(query, context);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
/// ... and create fail flag
|
||||
String exception_msg = getCurrentExceptionMessage(false, true);
|
||||
ops.emplace_back(std::make_unique<zkutil::Op::Create>(fail_node, exception_msg, nullptr, zkutil::CreateMode::Persistent));
|
||||
zookeeper->multi(ops);
|
||||
|
||||
tryLogCurrentException(log, "Query " + query + " wasn't finished successfully");
|
||||
return false;
|
||||
}
|
||||
|
||||
/// .. and create sucess flag
|
||||
ops.emplace_back(std::make_unique<zkutil::Op::Create>(sucsess_node, "", nullptr, zkutil::CreateMode::Persistent));
|
||||
zookeeper->multi(ops);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
void DDLWorker::enqueueQuery(const String & query, const std::vector<Cluster::Address> & addrs)
|
||||
{
|
||||
auto zookeeper = context.getZooKeeper();
|
||||
|
||||
String assigns_dir = getAssignsDir();
|
||||
String master_dir = getCurrentMasterDir();
|
||||
String query_path_prefix = getRoot() + "/query-";
|
||||
|
||||
zookeeper->createAncestors(assigns_dir + "/");
|
||||
zookeeper->createAncestors(master_dir + "/");
|
||||
String query_path = zookeeper->create(query_path_prefix, query, zkutil::CreateMode::PersistentSequential);
|
||||
String query_node = query_path.substr(query_path.find_last_of('/') + 1);
|
||||
|
||||
zkutil::Ops ops;
|
||||
auto acl = zookeeper->getDefaultACL();
|
||||
constexpr size_t max_ops_per_call = 100;
|
||||
|
||||
/// Create /root/masters/query_node and /root/query-node/* to monitor status of the tasks initiated by us
|
||||
{
|
||||
String num_hosts = toString(addrs.size());
|
||||
String master_query_node = master_dir + "/" + query_node;
|
||||
ops.emplace_back(std::make_unique<zkutil::Op::Create>(master_query_node, "", acl, zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(std::make_unique<zkutil::Op::Create>(query_path + "/active", "", acl, zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(std::make_unique<zkutil::Op::Create>(query_path + "/sucess", num_hosts, acl, zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(std::make_unique<zkutil::Op::Create>(query_path + "/failed", "", acl, zkutil::CreateMode::Persistent));
|
||||
zookeeper->multi(ops);
|
||||
ops.clear();
|
||||
}
|
||||
|
||||
/// Create hosts's taks dir /root/assigns/host (if not exists)
|
||||
for (auto it = addrs.cbegin(); it != addrs.cend(); ++it)
|
||||
{
|
||||
String cur_assign_dir = assigns_dir + "/" + it->toString();
|
||||
ops.emplace_back(std::make_unique<zkutil::Op::Create>(cur_assign_dir, "", acl, zkutil::CreateMode::Persistent));
|
||||
|
||||
if (ops.size() > max_ops_per_call || std::next(it) == addrs.cend())
|
||||
{
|
||||
std::string value;
|
||||
int code = zookeeper->tryMulti(ops);
|
||||
ops.clear();
|
||||
|
||||
if (zookeeper->tryGet(query_path, value))
|
||||
{
|
||||
if (!value.empty())
|
||||
executeQuery(value, context);
|
||||
|
||||
zookeeper->remove(query_path);
|
||||
}
|
||||
if (code != ZOK && code != ZNODEEXISTS)
|
||||
throw zkutil::KeeperException(code);
|
||||
}
|
||||
catch (const std::exception & ex)
|
||||
}
|
||||
|
||||
/// Asssign tasks to hosts /root/assigns/host/query_node
|
||||
for (auto it = addrs.cbegin(); it != addrs.cend(); ++it)
|
||||
{
|
||||
String cur_task_path = assigns_dir + "/" + it->toString() + "/" + query_node;
|
||||
ops.emplace_back(std::make_unique<zkutil::Op::Create>(cur_task_path, "", acl, zkutil::CreateMode::Persistent));
|
||||
|
||||
if (ops.size() > max_ops_per_call || std::next(it) == addrs.cend())
|
||||
{
|
||||
LOG_ERROR(log, ex.what() + std::string(" on ") + query_path);
|
||||
zookeeper->multi(ops);
|
||||
ops.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void DDLWorker::run()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
@ -135,4 +236,101 @@ void DDLWorker::run()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static bool getRemoteQueryExecutionStatus(const Cluster::Address & addr, const std::string & query, Exception & out_exception)
|
||||
{
|
||||
Connection conn(addr.host_name, addr.port, "", addr.user, addr.password);
|
||||
conn.sendQuery(query);
|
||||
|
||||
while (true)
|
||||
{
|
||||
Connection::Packet packet = conn.receivePacket();
|
||||
|
||||
if (packet.type == Protocol::Server::Exception)
|
||||
{
|
||||
out_exception = *packet.exception;
|
||||
return false;
|
||||
}
|
||||
else if (packet.type == Protocol::Server::EndOfStream)
|
||||
break;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
BlockIO executeDDLQueryOnCluster(const String & query, const String & cluster_name, Context & context)
|
||||
{
|
||||
ClusterPtr cluster = context.getCluster(cluster_name);
|
||||
Cluster::AddressesWithFailover shards = cluster->getShardsWithFailoverAddresses();
|
||||
std::vector<Cluster::Address> pending_hosts;
|
||||
|
||||
Array hosts_names_failed, hosts_errors, hosts_names_pending;
|
||||
size_t num_hosts_total = 0;
|
||||
size_t num_hosts_finished_successfully = 0;
|
||||
|
||||
for (const auto & shard : shards)
|
||||
{
|
||||
for (const auto & addr : shard)
|
||||
{
|
||||
try
|
||||
{
|
||||
Exception ex;
|
||||
if (!getRemoteQueryExecutionStatus(addr, query, ex))
|
||||
{
|
||||
/// Normal error
|
||||
String exception_msg = getExceptionMessage(ex, false, true);
|
||||
hosts_names_failed.emplace_back(addr.host_name);
|
||||
hosts_errors.emplace_back(exception_msg);
|
||||
|
||||
LOG_INFO(&Logger::get("DDLWorker"), "Query " << query << " failed on " << addr.host_name << ": " << exception_msg);
|
||||
}
|
||||
else
|
||||
{
|
||||
++num_hosts_finished_successfully;
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
/// Network error
|
||||
pending_hosts.emplace_back(addr);
|
||||
hosts_names_pending.emplace_back(addr.host_name);
|
||||
}
|
||||
}
|
||||
num_hosts_total += shard.size();
|
||||
}
|
||||
|
||||
|
||||
if (!pending_hosts.empty())
|
||||
context.getDDLWorker().enqueueQuery(query, pending_hosts);
|
||||
|
||||
|
||||
auto aray_of_strings = std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>());
|
||||
Block block{
|
||||
{std::make_shared<DataTypeUInt64>(), "num_hosts_total"},
|
||||
{std::make_shared<DataTypeUInt64>(), "num_hosts_finished_successfully"},
|
||||
{std::make_shared<DataTypeUInt64>(), "num_hosts_finished_unsuccessfully"},
|
||||
{std::make_shared<DataTypeUInt64>(), "num_hosts_pending"},
|
||||
{aray_of_strings->clone(), "hosts_finished_unsuccessfully"},
|
||||
{aray_of_strings->clone(), "hosts_finished_unsuccessfully_errors"},
|
||||
{aray_of_strings->clone(), "hosts_pending"}
|
||||
};
|
||||
|
||||
size_t num_hosts_finished = num_hosts_total - pending_hosts.size();
|
||||
size_t num_hosts_finished_unsuccessfully = num_hosts_finished - num_hosts_finished_successfully;
|
||||
block.getByName("num_hosts_total").column->insert(num_hosts_total);
|
||||
block.getByName("num_hosts_finished_successfully").column->insert(num_hosts_finished_successfully);
|
||||
block.getByName("num_hosts_finished_unsuccessfully").column->insert(num_hosts_finished_unsuccessfully);
|
||||
block.getByName("num_hosts_pending").column->insert(pending_hosts.size());
|
||||
block.getByName("hosts_finished_unsuccessfully").column->insert(hosts_names_failed);
|
||||
block.getByName("hosts_finished_unsuccessfully_errors").column->insert(hosts_errors);
|
||||
block.getByName("hosts_pending").column->insert(hosts_names_pending);
|
||||
|
||||
BlockIO io;
|
||||
io.in_sample = block.cloneEmpty();
|
||||
io.in = std::make_shared<OneBlockInputStream>(block);
|
||||
return io;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/Cluster.h>
|
||||
#include <DataStreams/BlockIO.h>
|
||||
#include <common/logger_useful.h>
|
||||
|
||||
#include <atomic>
|
||||
@ -12,16 +13,50 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
||||
BlockIO executeDDLQueryOnCluster(const String & query, const String & cluster_name, Context & context);
|
||||
|
||||
|
||||
class DDLWorker
|
||||
{
|
||||
public:
|
||||
DDLWorker(const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_name, Context & context_);
|
||||
DDLWorker(const std::string & zk_root_dir, Context & context_);
|
||||
~DDLWorker();
|
||||
|
||||
void enqueueQuery(const String & query, const std::vector<Cluster::Address> & addrs);
|
||||
|
||||
/// Returns root/ path in ZooKeeper
|
||||
std::string getRoot() const
|
||||
{
|
||||
return root_dir;
|
||||
}
|
||||
|
||||
std::string getAssignsDir() const
|
||||
{
|
||||
return root_dir + "/assigns";
|
||||
}
|
||||
|
||||
std::string getMastersDir() const
|
||||
{
|
||||
return root_dir + "/masters";
|
||||
}
|
||||
|
||||
std::string getCurrentMasterDir() const
|
||||
{
|
||||
return getMastersDir() + "/" + getHostName();
|
||||
}
|
||||
|
||||
std::string getHostName() const
|
||||
{
|
||||
return hostname;
|
||||
}
|
||||
|
||||
private:
|
||||
void processTasks();
|
||||
void processCreate(const std::string & path);
|
||||
bool processTask(const std::string & task);
|
||||
|
||||
void processQueries();
|
||||
bool processQuery(const std::string & task);
|
||||
|
||||
void run();
|
||||
|
||||
@ -29,7 +64,10 @@ private:
|
||||
Context & context;
|
||||
Logger * log = &Logger::get("DDLWorker");
|
||||
|
||||
std::string host_task_queue_path;
|
||||
std::string hostname;
|
||||
std::string root_dir; /// common dir with queue of queries
|
||||
std::string assign_dir; /// dir with tasks assigned to the server
|
||||
std::string master_dir; /// dir with queries was initiated by the server
|
||||
|
||||
std::atomic<bool> stop_flag;
|
||||
std::condition_variable cond_var;
|
||||
|
@ -23,6 +23,7 @@
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Parsers/ParserCreateQuery.h>
|
||||
#include <Parsers/parseQuery.h>
|
||||
#include <Parsers/queryToString.h>
|
||||
|
||||
#include <Storages/StorageFactory.h>
|
||||
#include <Storages/StorageLog.h>
|
||||
@ -31,6 +32,7 @@
|
||||
#include <Interpreters/InterpreterSelectQuery.h>
|
||||
#include <Interpreters/InterpreterCreateQuery.h>
|
||||
#include <Interpreters/ExpressionAnalyzer.h>
|
||||
#include <Interpreters/DDLWorker.h>
|
||||
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/DataTypeNested.h>
|
||||
@ -57,21 +59,6 @@ namespace ErrorCodes
|
||||
extern const int DUPLICATE_COLUMN;
|
||||
}
|
||||
|
||||
static void ExecuteQuery(const Cluster::Address & addr, const std::string & query)
|
||||
{
|
||||
Connection conn(addr.host_name, addr.port, "", addr.user, addr.password);
|
||||
conn.sendQuery(query);
|
||||
|
||||
while (true)
|
||||
{
|
||||
Connection::Packet packet = conn.receivePacket();
|
||||
|
||||
if (packet.type == Protocol::Server::Exception)
|
||||
throw Exception(*packet.exception.get());
|
||||
else if (packet.type == Protocol::Server::EndOfStream)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
InterpreterCreateQuery::InterpreterCreateQuery(const ASTPtr & query_ptr_, Context & context_)
|
||||
: query_ptr(query_ptr_), context(context_)
|
||||
@ -474,7 +461,7 @@ String InterpreterCreateQuery::setEngine(
|
||||
}
|
||||
|
||||
|
||||
BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
|
||||
BlockIO InterpreterCreateQuery::createTableOnServer(ASTCreateQuery & create)
|
||||
{
|
||||
String path = context.getPath();
|
||||
String current_database = context.getCurrentDatabase();
|
||||
@ -578,63 +565,16 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
|
||||
return {};
|
||||
}
|
||||
|
||||
ASTPtr InterpreterCreateQuery::createQueryWithoutCluster(ASTCreateQuery & create) const
|
||||
{
|
||||
ASTPtr cloned = create.clone();
|
||||
ASTCreateQuery & tmp = typeid_cast<ASTCreateQuery &>(*cloned);
|
||||
tmp.cluster.clear();
|
||||
if (tmp.database.empty())
|
||||
tmp.database = context.getCurrentDatabase();
|
||||
return cloned;
|
||||
}
|
||||
|
||||
void InterpreterCreateQuery::writeToZookeeper(ASTPtr query, const std::vector<Cluster::Address> & addrs)
|
||||
{
|
||||
auto zookeeper = context.getZooKeeper();
|
||||
ASTCreateQuery & create = typeid_cast<ASTCreateQuery &>(*query);
|
||||
std::string table_name = create.database + "." + create.table;
|
||||
|
||||
for (const auto & addr : addrs)
|
||||
{
|
||||
const std::string & path =
|
||||
"/clickhouse/task_queue/ddl/" +
|
||||
addr.host_name +
|
||||
"/create/" + table_name;
|
||||
|
||||
// TODO catch exceptions
|
||||
zookeeper->createAncestors(path);
|
||||
zookeeper->create(path, formatASTToString(*query), 0);
|
||||
}
|
||||
}
|
||||
|
||||
BlockIO InterpreterCreateQuery::createTableOnCluster(ASTCreateQuery & create)
|
||||
{
|
||||
ClusterPtr cluster = context.getCluster(create.cluster);
|
||||
Cluster::AddressesWithFailover shards = cluster->getShardsWithFailoverAddresses();
|
||||
ASTPtr query_ptr = createQueryWithoutCluster(create);
|
||||
std::string query = formatASTToString(*query_ptr);
|
||||
std::vector<Cluster::Address> failed;
|
||||
/// Do we really should use that database for each server?
|
||||
String query = create.getRewrittenQueryWithoutOnCluster(context.getCurrentDatabase());
|
||||
|
||||
for (const auto & shard : shards)
|
||||
{
|
||||
for (const auto & addr : shard)
|
||||
{
|
||||
try
|
||||
{
|
||||
ExecuteQuery(addr, query);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
failed.push_back(addr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
writeToZookeeper(query_ptr, failed);
|
||||
|
||||
return {};
|
||||
return executeDDLQueryOnCluster(query, create.cluster, context);
|
||||
}
|
||||
|
||||
|
||||
BlockIO InterpreterCreateQuery::execute()
|
||||
{
|
||||
ASTCreateQuery & create = typeid_cast<ASTCreateQuery &>(*query_ptr);
|
||||
@ -648,7 +588,7 @@ BlockIO InterpreterCreateQuery::execute()
|
||||
else if (!create.cluster.empty())
|
||||
return createTableOnCluster(create);
|
||||
else
|
||||
return createTable(create);
|
||||
return createTableOnServer(create);
|
||||
}
|
||||
|
||||
|
||||
|
@ -56,12 +56,9 @@ public:
|
||||
|
||||
private:
|
||||
void createDatabase(ASTCreateQuery & create);
|
||||
BlockIO createTable(ASTCreateQuery & create);
|
||||
BlockIO createTableOnServer(ASTCreateQuery & create);
|
||||
BlockIO createTableOnCluster(ASTCreateQuery & create);
|
||||
|
||||
ASTPtr createQueryWithoutCluster(ASTCreateQuery & create) const;
|
||||
void writeToZookeeper(ASTPtr query, const std::vector<Cluster::Address>& addrs);
|
||||
|
||||
/// Calculate list of columns of table and return it.
|
||||
ColumnsInfo setColumns(ASTCreateQuery & create, const Block & as_select_sample, const StoragePtr & as_storage) const;
|
||||
String setEngine(ASTCreateQuery & create, const StoragePtr & as_storage) const;
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include <Interpreters/InterpreterDropQuery.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Databases/IDatabase.h>
|
||||
#include <Interpreters/DDLWorker.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -26,12 +27,20 @@ InterpreterDropQuery::InterpreterDropQuery(const ASTPtr & query_ptr_, Context &
|
||||
|
||||
|
||||
BlockIO InterpreterDropQuery::execute()
|
||||
{
|
||||
ASTDropQuery & drop = typeid_cast<ASTDropQuery &>(*query_ptr);
|
||||
|
||||
if (drop.cluster.empty())
|
||||
return executeOnServer(drop);
|
||||
else
|
||||
return executeOnCluster(drop);
|
||||
}
|
||||
|
||||
BlockIO InterpreterDropQuery::executeOnServer(ASTDropQuery & drop)
|
||||
{
|
||||
String path = context.getPath();
|
||||
String current_database = context.getCurrentDatabase();
|
||||
|
||||
ASTDropQuery & drop = typeid_cast<ASTDropQuery &>(*query_ptr);
|
||||
|
||||
bool drop_database = drop.table.empty() && !drop.database.empty();
|
||||
|
||||
if (drop_database && drop.detach)
|
||||
@ -144,4 +153,11 @@ BlockIO InterpreterDropQuery::execute()
|
||||
}
|
||||
|
||||
|
||||
BlockIO InterpreterDropQuery::executeOnCluster(ASTDropQuery & drop)
|
||||
{
|
||||
String query = drop.getRewrittenQueryWithoutOnCluster(context.getCurrentDatabase());
|
||||
return executeDDLQueryOnCluster(query, drop.cluster, context);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -18,12 +18,15 @@ class InterpreterDropQuery : public IInterpreter
|
||||
public:
|
||||
InterpreterDropQuery(const ASTPtr & query_ptr_, Context & context_);
|
||||
|
||||
/// Drop table.
|
||||
/// Drop table or database.
|
||||
BlockIO execute() override;
|
||||
|
||||
private:
|
||||
ASTPtr query_ptr;
|
||||
Context & context;
|
||||
|
||||
BlockIO executeOnServer(ASTDropQuery & drop);
|
||||
BlockIO executeOnCluster(ASTDropQuery & drop);
|
||||
};
|
||||
|
||||
|
||||
|
@ -2,15 +2,15 @@
|
||||
|
||||
#include <Parsers/ASTExpressionList.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTDDLQueryWithOnCluster.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
||||
/** CREATE TABLE or ATTACH TABLE query
|
||||
*/
|
||||
class ASTCreateQuery : public IAST
|
||||
/// CREATE TABLE or ATTACH TABLE query
|
||||
class ASTCreateQuery : public IAST, public ASTDDLQueryWithOnCluster
|
||||
{
|
||||
public:
|
||||
bool attach{false}; /// Query ATTACH TABLE, not CREATE TABLE.
|
||||
@ -21,7 +21,6 @@ public:
|
||||
bool is_temporary{false};
|
||||
String database;
|
||||
String table;
|
||||
String cluster;
|
||||
ASTPtr columns;
|
||||
ASTPtr storage;
|
||||
ASTPtr inner_storage; /// Internal engine for the CREATE MATERIALIZED VIEW query
|
||||
@ -48,6 +47,18 @@ public:
|
||||
return res;
|
||||
}
|
||||
|
||||
ASTPtr getRewrittenASTWithoutOnCluster(const std::string & new_database) const override
|
||||
{
|
||||
auto query_ptr = clone();
|
||||
ASTCreateQuery & query = static_cast<ASTCreateQuery &>(*query_ptr);
|
||||
|
||||
query.cluster.clear();
|
||||
if (query.database.empty())
|
||||
query.database = new_database;
|
||||
|
||||
return query_ptr;
|
||||
}
|
||||
|
||||
protected:
|
||||
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override
|
||||
{
|
||||
@ -81,10 +92,11 @@ protected:
|
||||
<< (settings.hilite ? hilite_keyword : "")
|
||||
<< (attach ? "ATTACH " : "CREATE ")
|
||||
<< (is_temporary ? "TEMPORARY " : "")
|
||||
<< what
|
||||
<< " " << (if_not_exists ? "IF NOT EXISTS " : "")
|
||||
<< what << " "
|
||||
<< (if_not_exists ? "IF NOT EXISTS " : "")
|
||||
<< (settings.hilite ? hilite_none : "")
|
||||
<< (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table);
|
||||
<< (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table) << " "
|
||||
<< (!cluster.empty() ? "ON CLUSTER " + backQuoteIfNeed(cluster) + " " : "");
|
||||
}
|
||||
|
||||
if (!as_table.empty())
|
||||
|
12
dbms/src/Parsers/ASTDDLQueryWithOnCluster.cpp
Normal file
12
dbms/src/Parsers/ASTDDLQueryWithOnCluster.cpp
Normal file
@ -0,0 +1,12 @@
|
||||
#include <Parsers/ASTDDLQueryWithOnCluster.h>
|
||||
#include <Parsers/queryToString.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
std::string ASTDDLQueryWithOnCluster::getRewrittenQueryWithoutOnCluster(const std::string & new_database) const
|
||||
{
|
||||
return queryToString(getRewrittenASTWithoutOnCluster(new_database));
|
||||
}
|
||||
|
||||
}
|
22
dbms/src/Parsers/ASTDDLQueryWithOnCluster.h
Normal file
22
dbms/src/Parsers/ASTDDLQueryWithOnCluster.h
Normal file
@ -0,0 +1,22 @@
|
||||
#pragma once
|
||||
|
||||
#include <Parsers/IAST.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ASTDDLQueryWithOnCluster
|
||||
{
|
||||
public:
|
||||
|
||||
String cluster;
|
||||
|
||||
virtual ASTPtr getRewrittenASTWithoutOnCluster(const std::string & new_database = {}) const = 0;
|
||||
|
||||
std::string getRewrittenQueryWithoutOnCluster(const std::string & new_database = {}) const;
|
||||
|
||||
virtual ~ASTDDLQueryWithOnCluster() = default;
|
||||
};
|
||||
|
||||
}
|
@ -1,7 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <Parsers/IAST.h>
|
||||
|
||||
#include <Parsers/ASTDDLQueryWithOnCluster.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -9,7 +9,7 @@ namespace DB
|
||||
|
||||
/** DROP query
|
||||
*/
|
||||
class ASTDropQuery : public IAST
|
||||
class ASTDropQuery : public IAST, public ASTDDLQueryWithOnCluster
|
||||
{
|
||||
public:
|
||||
bool detach{false}; /// DETACH query, not DROP.
|
||||
@ -25,6 +25,18 @@ public:
|
||||
|
||||
ASTPtr clone() const override { return std::make_shared<ASTDropQuery>(*this); }
|
||||
|
||||
ASTPtr getRewrittenASTWithoutOnCluster(const std::string & new_database) const override
|
||||
{
|
||||
auto query_ptr = clone();
|
||||
ASTDropQuery & query = static_cast<ASTDropQuery &>(*query_ptr);
|
||||
|
||||
query.cluster.clear();
|
||||
if (query.database.empty())
|
||||
query.database = new_database;
|
||||
|
||||
return query_ptr;
|
||||
}
|
||||
|
||||
protected:
|
||||
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override
|
||||
{
|
||||
@ -41,7 +53,8 @@ protected:
|
||||
settings.ostr << (settings.hilite ? hilite_keyword : "")
|
||||
<< (detach ? "DETACH TABLE " : "DROP TABLE ")
|
||||
<< (if_exists ? "IF EXISTS " : "") << (settings.hilite ? hilite_none : "")
|
||||
<< (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table);
|
||||
<< (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table)
|
||||
<< (!cluster.empty() ? " ON CLUSTER " + backQuoteIfNeed(cluster) + " " : "");
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -151,8 +151,6 @@ bool ParserCreateQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p
|
||||
ParserString s_attach("ATTACH", true, true);
|
||||
ParserString s_table("TABLE", true, true);
|
||||
ParserString s_database("DATABASE", true, true);
|
||||
ParserString s_on("ON", true, true);
|
||||
ParserString s_cluster("CLUSTER", true, true);
|
||||
ParserString s_dot(".");
|
||||
ParserString s_lparen("(");
|
||||
ParserString s_rparen(")");
|
||||
@ -247,11 +245,11 @@ bool ParserCreateQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p
|
||||
ws.ignore(pos, end);
|
||||
}
|
||||
|
||||
if (s_on.ignore(pos, end, max_parsed_pos, expected))
|
||||
if (ParserString{"ON", true, true}.ignore(pos, end, max_parsed_pos, expected))
|
||||
{
|
||||
ws.ignore(pos, end);
|
||||
|
||||
if (!s_cluster.ignore(pos, end, max_parsed_pos, expected))
|
||||
if (!ParserString{"CLUSTER", true, true}.ignore(pos, end, max_parsed_pos, expected))
|
||||
return false;
|
||||
|
||||
ws.ignore(pos, end);
|
||||
|
@ -27,6 +27,7 @@ bool ParserDropQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_par
|
||||
|
||||
ASTPtr database;
|
||||
ASTPtr table;
|
||||
ASTPtr cluster;
|
||||
bool detach = false;
|
||||
bool if_exists = false;
|
||||
|
||||
@ -81,6 +82,21 @@ bool ParserDropQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_par
|
||||
|
||||
ws.ignore(pos, end);
|
||||
}
|
||||
|
||||
if (ParserString{"ON", true, true}.ignore(pos, end, max_parsed_pos, expected))
|
||||
{
|
||||
ws.ignore(pos, end);
|
||||
|
||||
if (!ParserString{"CLUSTER", true, true}.ignore(pos, end, max_parsed_pos, expected))
|
||||
return false;
|
||||
|
||||
ws.ignore(pos, end);
|
||||
|
||||
if (!name_p.parse(pos, end, cluster, max_parsed_pos, expected))
|
||||
return false;
|
||||
|
||||
ws.ignore(pos, end);
|
||||
}
|
||||
}
|
||||
|
||||
ws.ignore(pos, end);
|
||||
@ -94,6 +110,8 @@ bool ParserDropQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_par
|
||||
query->database = typeid_cast<ASTIdentifier &>(*database).name;
|
||||
if (table)
|
||||
query->table = typeid_cast<ASTIdentifier &>(*table).name;
|
||||
if (cluster)
|
||||
query->cluster = typeid_cast<ASTIdentifier &>(*cluster).name;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -11,12 +11,6 @@ void formatAST(const IAST & ast, std::ostream & s, size_t indent, bool hilite, b
|
||||
ast.format(settings);
|
||||
}
|
||||
|
||||
std::string formatASTToString(const IAST & ast)
|
||||
{
|
||||
std::stringstream s;
|
||||
formatAST(ast, s, 0, false, true);
|
||||
return s.str();
|
||||
}
|
||||
|
||||
String formatColumnsForCreateQuery(NamesAndTypesList & columns)
|
||||
{
|
||||
|
@ -14,8 +14,6 @@ namespace DB
|
||||
*/
|
||||
void formatAST(const IAST & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false);
|
||||
|
||||
std::string formatASTToString(const IAST & ast);
|
||||
|
||||
String formatColumnsForCreateQuery(NamesAndTypesList & columns);
|
||||
|
||||
inline std::ostream & operator<<(std::ostream & os, const IAST & ast) { return formatAST(ast, os, 0, false, true), os; }
|
||||
|
@ -416,11 +416,11 @@ int Server::main(const std::vector<std::string> & args)
|
||||
|
||||
if (has_zookeeper && config().has("distributed_ddl"))
|
||||
{
|
||||
auto ddl_worker = std::make_shared<DDLWorker>(config(), "distributed_ddl", *global_context);
|
||||
global_context->setDDLWorker(ddl_worker);
|
||||
String ddl_zookeeper_path = config().getString("distributed_ddl.path", "/clickhouse/task_queue/ddl/");
|
||||
global_context->setDDLWorker(std::make_shared<DDLWorker>(ddl_zookeeper_path, *global_context));
|
||||
}
|
||||
|
||||
SCOPE_EXIT(
|
||||
SCOPE_EXIT({
|
||||
/** Ask to cancel background jobs all table engines,
|
||||
* and also query_log.
|
||||
* It is important to do early, not in destructor of Context, because
|
||||
|
@ -82,12 +82,12 @@ public:
|
||||
for (const auto & key : keys)
|
||||
{
|
||||
if (key == "task_queue_path")
|
||||
task_queue_path = config.getString(config_name + "." + key);
|
||||
ddl_queries_root = config.getString(config_name + "." + key);
|
||||
else
|
||||
throw Exception{"Unknown parameter in resharding configuration", ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG};
|
||||
}
|
||||
|
||||
if (task_queue_path.empty())
|
||||
if (ddl_queries_root.empty())
|
||||
throw Exception{"Resharding: missing parameter task_queue_path", ErrorCodes::INVALID_CONFIG_PARAMETER};
|
||||
}
|
||||
|
||||
@ -96,11 +96,11 @@ public:
|
||||
|
||||
std::string getTaskQueuePath() const
|
||||
{
|
||||
return task_queue_path;
|
||||
return ddl_queries_root;
|
||||
}
|
||||
|
||||
private:
|
||||
std::string task_queue_path;
|
||||
std::string ddl_queries_root;
|
||||
};
|
||||
|
||||
/// Helper class we use to read and write the status of a coordinator
|
||||
|
Loading…
Reference in New Issue
Block a user