squashed commits

This commit is contained in:
Nikita Mikhaylov 2020-02-19 18:01:08 +03:00
parent 3e0b76a077
commit 24cbd0d6d1
13 changed files with 2645 additions and 2434 deletions

View File

@ -0,0 +1,26 @@
#pragma once
#include <Interpreters/Cluster.h>
namespace DB
{
using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
using DatabaseAndTableName = std::pair<String, String>;
/// Hierarchical description of the tasks
struct ShardPartition;
struct TaskShard;
struct TaskTable;
struct TaskCluster;
struct ClusterPartition;
using TasksPartition = std::map<String, ShardPartition, std::greater<>>;
using ShardInfo = Cluster::ShardInfo;
using TaskShardPtr = std::shared_ptr<TaskShard>;
using TasksShard = std::vector<TaskShardPtr>;
using TasksTable = std::list<TaskTable>;
using ClusterPartitions = std::map<String, ClusterPartition, std::greater<>>;
}

View File

@ -1,5 +1,17 @@
set(CLICKHOUSE_COPIER_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/ClusterCopier.cpp)
set(CLICKHOUSE_COPIER_LINK PRIVATE clickhouse_common_zookeeper clickhouse_parsers clickhouse_functions clickhouse_table_functions clickhouse_aggregate_functions clickhouse_dictionaries string_utils ${Poco_XML_LIBRARY} PUBLIC daemon)
set(CLICKHOUSE_COPIER_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/ClusterCopierApp.cpp
${CMAKE_CURRENT_SOURCE_DIR}/ClusterCopier.cpp
${CMAKE_CURRENT_SOURCE_DIR}/Internals.cpp)
set(CLICKHOUSE_COPIER_LINK PRIVATE
clickhouse_common_zookeeper
clickhouse_parsers
clickhouse_functions
clickhouse_table_functions
clickhouse_aggregate_functions
clickhouse_dictionaries
string_utils ${Poco_XML_LIBRARY} PUBLIC daemon)
set(CLICKHOUSE_COPIER_INCLUDE SYSTEM PRIVATE ${PCG_RANDOM_INCLUDE_DIR})
clickhouse_program_add(copier)

File diff suppressed because it is too large Load Diff

View File

@ -1,89 +1,175 @@
#pragma once
#include <Poco/Util/ServerApplication.h>
#include <daemon/BaseDaemon.h>
/* clickhouse cluster copier util
* Copies tables data from one cluster to new tables of other (possibly the same) cluster in distributed fault-tolerant manner.
*
* See overview in the docs: docs/en/utils/clickhouse-copier.md
*
* Implementation details:
*
* cluster-copier workers pull each partition of each shard of the source cluster and push it to the destination cluster through
* Distributed table (to preform data resharding). So, worker job is a partition of a source shard.
* A job has three states: Active, Finished and Abandoned. Abandoned means that worker died and did not finish the job.
*
* If an error occurred during the copying (a worker failed or a worker did not finish the INSERT), then the whole partition (on
* all destination servers) should be dropped and refilled. So, copying entity is a partition of all destination shards.
* If a failure is detected a special /is_dirty node is created in ZooKeeper signalling that other workers copying the same partition
* should stop, after a refilling procedure should start.
*
* ZooKeeper task node has the following structure:
* /task/path_root - path passed in --task-path parameter
* /description - contains user-defined XML config of the task
* /task_active_workers - contains ephemeral nodes of all currently active workers, used to implement max_workers limitation
* /server_fqdn#PID_timestamp - cluster-copier worker ID
* ...
* /tables - directory with table tasks
* /cluster.db.table1 - directory of table_hits task
* /partition1 - directory for partition1
* /shards - directory for source cluster shards
* /1 - worker job for the first shard of partition1 of table test.hits
* Contains info about current status (Active or Finished) and worker ID.
* /2
* ...
* /partition_active_workers
* /1 - for each job in /shards a corresponding ephemeral node created in /partition_active_workers
* It is used to detect Abandoned jobs (if there is Active node in /shards and there is no node in
* /partition_active_workers).
* Also, it is used to track active workers in the partition (when we need to refill the partition we do
* not DROP PARTITION while there are active workers)
* /2
* ...
* /is_dirty - the node is set if some worker detected that an error occurred (the INSERT is failed or an Abandoned node is
* detected). If the node appeared workers in this partition should stop and start cleaning and refilling
* partition procedure.
* During this procedure a single 'cleaner' worker is selected. The worker waits for stopping all partition
* workers, removes /shards node, executes DROP PARTITION on each destination node and removes /is_dirty node.
* /cleaner- An ephemeral node used to select 'cleaner' worker. Contains ID of the worker.
* /cluster.db.table2
* ...
*/
#include "Aliases.h"
#include "Internals.h"
#include "TaskCluster.h"
#include "TaskTableAndShard.h"
#include "ShardPartition.h"
#include "ZooKeeperStaff.h"
namespace DB
{
class ClusterCopierApp : public BaseDaemon
class ClusterCopier
{
public:
void initialize(Poco::Util::Application & self) override;
ClusterCopier(const String &task_path_,
const String &host_id_,
const String &proxy_database_name_,
Context &context_)
:
task_zookeeper_path(task_path_),
host_id(host_id_),
working_database_name(proxy_database_name_),
context(context_),
log(&Poco::Logger::get("ClusterCopier")) {}
void handleHelp(const std::string &, const std::string &);
void init();
void defineOptions(Poco::Util::OptionSet & options) override;
template<typename T>
decltype(auto) retry(T &&func, UInt64 max_tries = 100);
int main(const std::vector<std::string> &) override;
void discoverShardPartitions(const ConnectionTimeouts & timeouts, const TaskShardPtr & task_shard) ;
/// Compute set of partitions, assume set of partitions aren't changed during the processing
void discoverTablePartitions(const ConnectionTimeouts &timeouts, TaskTable &task_table, UInt64 num_threads = 0);
void uploadTaskDescription(const std::string &task_path, const std::string &task_file, const bool force);
void reloadTaskDescription();
void updateConfigIfNeeded();
void process(const ConnectionTimeouts & timeouts);
/// Disables DROP PARTITION commands that used to clear data after errors
void setSafeMode(bool is_safe_mode_ = true)
{
is_safe_mode = is_safe_mode_;
}
void setCopyFaultProbability(double copy_fault_probability_)
{
copy_fault_probability = copy_fault_probability_;
}
protected:
String getWorkersPath() const
{
return task_cluster->task_zookeeper_path + "/task_active_workers";
}
String getWorkersPathVersion() const
{
return getWorkersPath() + "_version";
}
String getCurrentWorkerNodePath() const
{
return getWorkersPath() + "/" + host_id;
}
zkutil::EphemeralNodeHolder::Ptr createTaskWorkerNodeAndWaitIfNeed(
const zkutil::ZooKeeperPtr &zookeeper,
const String &description,
bool unprioritized);
/** Checks that the whole partition of a table was copied. We should do it carefully due to dirty lock.
* State of some task could change during the processing.
* We have to ensure that all shards have the finished state and there is no dirty flag.
* Moreover, we have to check status twice and check zxid, because state can change during the checking.
*/
bool checkPartitionIsDone(const TaskTable &task_table, const String &partition_name,
const TasksShard &shards_with_partition);
/// Removes MATERIALIZED and ALIAS columns from create table query
static ASTPtr removeAliasColumnsFromCreateQuery(const ASTPtr &query_ast);
/// Replaces ENGINE and table name in a create query
std::shared_ptr<ASTCreateQuery>
rewriteCreateQueryStorage(const ASTPtr &create_query_ast, const DatabaseAndTableName &new_table,
const ASTPtr &new_storage_ast);
bool tryDropPartition(ShardPartition &task_partition,
const zkutil::ZooKeeperPtr &zookeeper,
const CleanStateClock &clean_state_clock);
static constexpr UInt64 max_table_tries = 1000;
static constexpr UInt64 max_shard_partition_tries = 600;
bool tryProcessTable(const ConnectionTimeouts &timeouts, TaskTable &task_table);
PartitionTaskStatus tryProcessPartitionTask(const ConnectionTimeouts & timeouts,
ShardPartition & task_partition,
bool is_unprioritized_task);
PartitionTaskStatus processPartitionTaskImpl(const ConnectionTimeouts & timeouts,
ShardPartition & task_partition,
bool is_unprioritized_task);
void dropAndCreateLocalTable(const ASTPtr & create_ast);
void dropLocalTableIfExists (const DatabaseAndTableName & table_name) const;
String getRemoteCreateTable(const DatabaseAndTableName & table,
Connection & connection,
const Settings * settings = nullptr);
ASTPtr getCreateTableForPullShard(const ConnectionTimeouts & timeouts,
TaskShard & task_shard);
void createShardInternalTables(const ConnectionTimeouts & timeouts,
TaskShard & task_shard,
bool create_split = true);
std::set<String> getShardPartitions(const ConnectionTimeouts & timeouts,
TaskShard & task_shard);
bool checkShardHasPartition(const ConnectionTimeouts & timeouts,
TaskShard & task_shard,
const String & partition_quoted_name);
/** Executes simple query (without output streams, for example DDL queries) on each shard of the cluster
* Returns number of shards for which at least one replica executed query successfully
*/
UInt64 executeQueryOnCluster(
const ClusterPtr &cluster,
const String &query,
const ASTPtr &query_ast_ = nullptr,
const Settings *settings = nullptr,
PoolMode pool_mode = PoolMode::GET_ALL,
UInt64 max_successful_executions_per_shard = 0) const;
private:
String task_zookeeper_path;
String task_description_path;
String host_id;
String working_database_name;
using Base = BaseDaemon;
/// Auto update config stuff
UInt64 task_descprtion_current_version = 1;
std::atomic<UInt64> task_descprtion_version{1};
Coordination::WatchCallback task_description_watch_callback;
/// ZooKeeper session used to set the callback
zkutil::ZooKeeperPtr task_description_watch_zookeeper;
void mainImpl();
ConfigurationPtr task_cluster_initial_config;
ConfigurationPtr task_cluster_current_config;
Coordination::Stat task_descprtion_current_stat{};
void setupLogging();
std::unique_ptr<TaskCluster> task_cluster;
std::string config_xml_path;
std::string task_path;
std::string log_level = "debug";
bool is_safe_mode = false;
double copy_fault_probability = 0;
bool is_help = false;
double copy_fault_probability = 0.0;
std::string base_dir;
std::string process_path;
std::string process_id;
std::string host_id;
Context &context;
Poco::Logger *log;
std::chrono::milliseconds default_sleep_time{1000};
};
}

View File

@ -0,0 +1,172 @@
#include "ClusterCopierApp.h"
namespace DB
{
/// ClusterCopierApp
void ClusterCopierApp::initialize(Poco::Util::Application & self)
{
is_help = config().has("help");
if (is_help)
return;
config_xml_path = config().getString("config-file");
task_path = config().getString("task-path");
log_level = config().getString("log-level", "trace");
is_safe_mode = config().has("safe-mode");
if (config().has("copy-fault-probability"))
copy_fault_probability = std::max(std::min(config().getDouble("copy-fault-probability"), 1.0), 0.0);
base_dir = (config().has("base-dir")) ? config().getString("base-dir") : Poco::Path::current();
// process_id is '<hostname>#<start_timestamp>_<pid>'
time_t timestamp = Poco::Timestamp().epochTime();
auto curr_pid = Poco::Process::id();
process_id = std::to_string(DateLUT::instance().toNumYYYYMMDDhhmmss(timestamp)) + "_" + std::to_string(curr_pid);
host_id = escapeForFileName(getFQDNOrHostName()) + '#' + process_id;
process_path = Poco::Path(base_dir + "/clickhouse-copier_" + process_id).absolute().toString();
Poco::File(process_path).createDirectories();
/// Override variables for BaseDaemon
if (config().has("log-level"))
config().setString("logger.level", config().getString("log-level"));
if (config().has("base-dir") || !config().has("logger.log"))
config().setString("logger.log", process_path + "/log.log");
if (config().has("base-dir") || !config().has("logger.errorlog"))
config().setString("logger.errorlog", process_path + "/log.err.log");
Base::initialize(self);
}
void ClusterCopierApp::handleHelp(const std::string &, const std::string &)
{
Poco::Util::HelpFormatter helpFormatter(options());
helpFormatter.setCommand(commandName());
helpFormatter.setHeader("Copies tables from one cluster to another");
helpFormatter.setUsage("--config-file <config-file> --task-path <task-path>");
helpFormatter.format(std::cerr);
stopOptionsProcessing();
}
void ClusterCopierApp::defineOptions(Poco::Util::OptionSet & options)
{
Base::defineOptions(options);
options.addOption(Poco::Util::Option("task-path", "", "path to task in ZooKeeper")
.argument("task-path").binding("task-path"));
options.addOption(Poco::Util::Option("task-file", "", "path to task file for uploading in ZooKeeper to task-path")
.argument("task-file").binding("task-file"));
options.addOption(Poco::Util::Option("task-upload-force", "", "Force upload task-file even node already exists")
.argument("task-upload-force").binding("task-upload-force"));
options.addOption(Poco::Util::Option("safe-mode", "", "disables ALTER DROP PARTITION in case of errors")
.binding("safe-mode"));
options.addOption(Poco::Util::Option("copy-fault-probability", "", "the copying fails with specified probability (used to test partition state recovering)")
.argument("copy-fault-probability").binding("copy-fault-probability"));
options.addOption(Poco::Util::Option("log-level", "", "sets log level")
.argument("log-level").binding("log-level"));
options.addOption(Poco::Util::Option("base-dir", "", "base directory for copiers, consecutive copier launches will populate /base-dir/launch_id/* directories")
.argument("base-dir").binding("base-dir"));
using Me = std::decay_t<decltype(*this)>;
options.addOption(Poco::Util::Option("help", "", "produce this help message").binding("help")
.callback(Poco::Util::OptionCallback<Me>(this, &Me::handleHelp)));
}
void ClusterCopierApp::mainImpl()
{
StatusFile status_file(process_path + "/status");
ThreadStatus thread_status;
auto log = &logger();
LOG_INFO(log, "Starting clickhouse-copier ("
<< "id " << process_id << ", "
<< "host_id " << host_id << ", "
<< "path " << process_path << ", "
<< "revision " << ClickHouseRevision::get() << ")");
auto context = std::make_unique<Context>(Context::createGlobal());
context->makeGlobalContext();
SCOPE_EXIT(context->shutdown());
context->setConfig(loaded_config.configuration);
context->setApplicationType(Context::ApplicationType::LOCAL);
context->setPath(process_path);
registerFunctions();
registerAggregateFunctions();
registerTableFunctions();
registerStorages();
registerDictionaries();
registerDisks();
static const std::string default_database = "_local";
context->addDatabase(default_database, std::make_shared<DatabaseMemory>(default_database));
context->setCurrentDatabase(default_database);
/// Initialize query scope just in case.
CurrentThread::QueryScope query_scope(*context);
auto copier = std::make_unique<ClusterCopier>(task_path, host_id, default_database, *context);
copier->setSafeMode(is_safe_mode);
copier->setCopyFaultProbability(copy_fault_probability);
auto task_file = config().getString("task-file", "");
if (!task_file.empty())
copier->uploadTaskDescription(task_path, task_file, config().getBool("task-upload-force", false));
copier->init();
copier->process(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(context->getSettingsRef()));
/// Reset ZooKeeper before removing ClusterCopier.
/// Otherwise zookeeper watch can call callback which use already removed ClusterCopier object.
context->resetZooKeeper();
}
int ClusterCopierApp::main(const std::vector<std::string> &)
{
if (is_help)
return 0;
try
{
mainImpl();
}
catch (...)
{
tryLogCurrentException(&Poco::Logger::root(), __PRETTY_FUNCTION__);
auto code = getCurrentExceptionCode();
return (code) ? code : -1;
}
return 0;
}
}
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wmissing-declarations"
int mainEntryClickHouseClusterCopier(int argc, char ** argv)
{
try
{
DB::ClusterCopierApp app;
return app.run(argc, argv);
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true) << "\n";
auto code = DB::getCurrentExceptionCode();
return (code) ? code : -1;
}
}

View File

@ -0,0 +1,90 @@
#pragma once
#include <Poco/Util/ServerApplication.h>
#include <daemon/BaseDaemon.h>
#include "ClusterCopier.h"
/* clickhouse cluster copier util
* Copies tables data from one cluster to new tables of other (possibly the same) cluster in distributed fault-tolerant manner.
*
* See overview in the docs: docs/en/utils/clickhouse-copier.md
*
* Implementation details:
*
* cluster-copier workers pull each partition of each shard of the source cluster and push it to the destination cluster through
* Distributed table (to preform data resharding). So, worker job is a partition of a source shard.
* A job has three states: Active, Finished and Abandoned. Abandoned means that worker died and did not finish the job.
*
* If an error occurred during the copying (a worker failed or a worker did not finish the INSERT), then the whole partition (on
* all destination servers) should be dropped and refilled. So, copying entity is a partition of all destination shards.
* If a failure is detected a special /is_dirty node is created in ZooKeeper signalling that other workers copying the same partition
* should stop, after a refilling procedure should start.
*
* ZooKeeper task node has the following structure:
* /task/path_root - path passed in --task-path parameter
* /description - contains user-defined XML config of the task
* /task_active_workers - contains ephemeral nodes of all currently active workers, used to implement max_workers limitation
* /server_fqdn#PID_timestamp - cluster-copier worker ID
* ...
* /tables - directory with table tasks
* /cluster.db.table1 - directory of table_hits task
* /partition1 - directory for partition1
* /shards - directory for source cluster shards
* /1 - worker job for the first shard of partition1 of table test.hits
* Contains info about current status (Active or Finished) and worker ID.
* /2
* ...
* /partition_active_workers
* /1 - for each job in /shards a corresponding ephemeral node created in /partition_active_workers
* It is used to detect Abandoned jobs (if there is Active node in /shards and there is no node in
* /partition_active_workers).
* Also, it is used to track active workers in the partition (when we need to refill the partition we do
* not DROP PARTITION while there are active workers)
* /2
* ...
* /is_dirty - the node is set if some worker detected that an error occurred (the INSERT is failed or an Abandoned node is
* detected). If the node appeared workers in this partition should stop and start cleaning and refilling
* partition procedure.
* During this procedure a single 'cleaner' worker is selected. The worker waits for stopping all partition
* workers, removes /shards node, executes DROP PARTITION on each destination node and removes /is_dirty node.
* /cleaner- An ephemeral node used to select 'cleaner' worker. Contains ID of the worker.
* /cluster.db.table2
* ...
*/
namespace DB
{
class ClusterCopierApp : public BaseDaemon
{
public:
void initialize(Poco::Util::Application & self) override;
void handleHelp(const std::string &, const std::string &);
void defineOptions(Poco::Util::OptionSet & options) override;
int main(const std::vector<std::string> &) override;
private:
using Base = BaseDaemon;
void mainImpl();
std::string config_xml_path;
std::string task_path;
std::string log_level = "trace";
bool is_safe_mode = false;
double copy_fault_probability = 0;
bool is_help = false;
std::string base_dir;
std::string process_path;
std::string process_id;
std::string host_id;
};
}

View File

@ -0,0 +1,16 @@
#pragma once
#include "Aliases.h"
namespace DB
{
/// Contains info about all shards that contain a partition
struct ClusterPartition {
double elapsed_time_seconds = 0;
UInt64 bytes_copied = 0;
UInt64 rows_copied = 0;
UInt64 blocks_copied = 0;
UInt64 total_tries = 0;
};
}

View File

@ -0,0 +1,129 @@
#include "Internals.h"
namespace DB {
[[maybe_unused]] ConfigurationPtr getConfigurationFromXMLString(const std::string &xml_data) {
std::stringstream ss(xml_data);
Poco::XML::InputSource input_source{ss};
return {new Poco::Util::XMLConfiguration{&input_source}};
}
String getQuotedTable(const String &database, const String &table) {
if (database.empty()) {
return backQuoteIfNeed(table);
}
return backQuoteIfNeed(database) + "." + backQuoteIfNeed(table);
}
String getQuotedTable(const DatabaseAndTableName &db_and_table) {
return getQuotedTable(db_and_table.first, db_and_table.second);
}
// Creates AST representing 'ENGINE = Distributed(cluster, db, table, [sharding_key])
std::shared_ptr<ASTStorage> createASTStorageDistributed(
const String &cluster_name, const String &database, const String &table,
const ASTPtr &sharding_key_ast) {
auto args = std::make_shared<ASTExpressionList>();
args->children.emplace_back(std::make_shared<ASTLiteral>(cluster_name));
args->children.emplace_back(std::make_shared<ASTIdentifier>(database));
args->children.emplace_back(std::make_shared<ASTIdentifier>(table));
if (sharding_key_ast)
args->children.emplace_back(sharding_key_ast);
auto engine = std::make_shared<ASTFunction>();
engine->name = "Distributed";
engine->arguments = args;
auto storage = std::make_shared<ASTStorage>();
storage->set(storage->engine, engine);
return storage;
}
BlockInputStreamPtr squashStreamIntoOneBlock(const BlockInputStreamPtr &stream) {
return std::make_shared<SquashingBlockInputStream>(
stream,
std::numeric_limits<size_t>::max(),
std::numeric_limits<size_t>::max());
}
Block getBlockWithAllStreamData(const BlockInputStreamPtr &stream) {
return squashStreamIntoOneBlock(stream)->read();
}
bool isExtendedDefinitionStorage(const ASTPtr &storage_ast) {
const auto &storage = storage_ast->as<ASTStorage &>();
return storage.partition_by || storage.order_by || storage.sample_by;
}
ASTPtr extractPartitionKey(const ASTPtr &storage_ast) {
String storage_str = queryToString(storage_ast);
const auto &storage = storage_ast->as<ASTStorage &>();
const auto &engine = storage.engine->as<ASTFunction &>();
if (!endsWith(engine.name, "MergeTree")) {
throw Exception(
"Unsupported engine was specified in " + storage_str + ", only *MergeTree engines are supported",
ErrorCodes::BAD_ARGUMENTS);
}
if (isExtendedDefinitionStorage(storage_ast)) {
if (storage.partition_by)
return storage.partition_by->clone();
static const char *all = "all";
return std::make_shared<ASTLiteral>(Field(all, strlen(all)));
} else {
bool is_replicated = startsWith(engine.name, "Replicated");
size_t min_args = is_replicated ? 3 : 1;
if (!engine.arguments)
throw Exception("Expected arguments in " + storage_str, ErrorCodes::BAD_ARGUMENTS);
ASTPtr arguments_ast = engine.arguments->clone();
ASTs &arguments = arguments_ast->children;
if (arguments.size() < min_args)
throw Exception("Expected at least " + toString(min_args) + " arguments in " + storage_str,
ErrorCodes::BAD_ARGUMENTS);
ASTPtr &month_arg = is_replicated ? arguments[2] : arguments[1];
return makeASTFunction("toYYYYMM", month_arg->clone());
}
}
[[maybe_unused]] ShardPriority getReplicasPriority(const Cluster::Addresses & replicas, const std::string & local_hostname, UInt8 random) {
ShardPriority res;
if (replicas.empty())
return res;
res.is_remote = 1;
for (auto & replica : replicas)
{
if (isLocalAddress(DNSResolver::instance().resolveHost(replica.host_name)))
{
res.is_remote = 0;
break;
}
}
res.hostname_difference = std::numeric_limits<size_t>::max();
for (auto & replica : replicas)
{
size_t difference = getHostNameDifference(local_hostname, replica.host_name);
res.hostname_difference = std::min(difference, res.hostname_difference);
}
res.random = random;
return res;
}
}

View File

@ -0,0 +1,176 @@
#pragma once
#include <chrono>
#include <optional>
#include <Poco/Util/XMLConfiguration.h>
#include <Poco/Logger.h>
#include <Poco/ConsoleChannel.h>
#include <Poco/FormattingChannel.h>
#include <Poco/PatternFormatter.h>
#include <Poco/UUIDGenerator.h>
#include <Poco/File.h>
#include <Poco/Process.h>
#include <Poco/FileChannel.h>
#include <Poco/SplitterChannel.h>
#include <Poco/Util/HelpFormatter.h>
#include <boost/algorithm/string.hpp>
#include <common/logger_useful.h>
#include <Common/ThreadPool.h>
#include <Common/Exception.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/getFQDNOrHostName.h>
#include <Common/isLocalAddress.h>
#include <Common/typeid_cast.h>
#include <Common/ClickHouseRevision.h>
#include <Common/formatReadable.h>
#include <Common/DNSResolver.h>
#include <Common/CurrentThread.h>
#include <Common/escapeForFileName.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/ThreadStatus.h>
#include <Client/Connection.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterFactory.h>
#include <Interpreters/InterpreterExistsQuery.h>
#include <Interpreters/InterpreterShowCreateQuery.h>
#include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeString.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ParserQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTExpressionList.h>
#include <Formats/FormatSettings.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <DataStreams/AsynchronousBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <DataStreams/NullBlockOutputStream.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadBufferFromFile.h>
#include <Functions/registerFunctions.h>
#include <TableFunctions/registerTableFunctions.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Storages/registerStorages.h>
#include <Storages/StorageDistributed.h>
#include <Dictionaries/registerDictionaries.h>
#include <Disks/registerDisks.h>
#include <Databases/DatabaseMemory.h>
#include <Common/StatusFile.h>
#include "Aliases.h"
namespace DB {
namespace ErrorCodes
{
extern const int NO_ZOOKEEPER;
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_TABLE;
extern const int UNFINISHED;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
}
[[maybe_unused]] ConfigurationPtr getConfigurationFromXMLString(const std::string &xml_data);
[[maybe_unused]] String getQuotedTable(const String &database, const String &table);
[[maybe_unused]] String getQuotedTable(const DatabaseAndTableName &db_and_table);
enum class TaskState {
Started = 0,
Finished,
Unknown
};
/// Used to mark status of shard partition tasks
struct TaskStateWithOwner {
TaskStateWithOwner() = default;
TaskStateWithOwner(TaskState state_, const String &owner_) : state(state_), owner(owner_) {}
TaskState state{TaskState::Unknown};
String owner;
static String getData(TaskState state, const String &owner) {
return TaskStateWithOwner(state, owner).toString();
}
String toString() {
WriteBufferFromOwnString wb;
wb << static_cast<UInt32>(state) << "\n" << escape << owner;
return wb.str();
}
static TaskStateWithOwner fromString(const String &data) {
ReadBufferFromString rb(data);
TaskStateWithOwner res;
UInt32 state;
rb >> state >> "\n" >> escape >> res.owner;
if (state >= static_cast<int>(TaskState::Unknown))
throw Exception("Unknown state " + data, ErrorCodes::LOGICAL_ERROR);
res.state = static_cast<TaskState>(state);
return res;
}
};
struct ShardPriority
{
UInt8 is_remote = 1;
size_t hostname_difference = 0;
UInt8 random = 0;
static bool greaterPriority(const ShardPriority &current, const ShardPriority &other) {
return std::forward_as_tuple(current.is_remote, current.hostname_difference, current.random)
< std::forward_as_tuple(other.is_remote, other.hostname_difference, other.random);
}
};
/// Execution status of a task
enum class PartitionTaskStatus
{
Active,
Finished,
Error,
};
struct MultiTransactionInfo {
int32_t code;
Coordination::Requests requests;
Coordination::Responses responses;
};
// Creates AST representing 'ENGINE = Distributed(cluster, db, table, [sharding_key])
[[maybe_unused]] std::shared_ptr<ASTStorage> createASTStorageDistributed(
const String &cluster_name, const String &database, const String &table,
const ASTPtr &sharding_key_ast = nullptr);
[[maybe_unused]] BlockInputStreamPtr squashStreamIntoOneBlock(const BlockInputStreamPtr &stream);
[[maybe_unused]] Block getBlockWithAllStreamData(const BlockInputStreamPtr &stream);
[[maybe_unused]] bool isExtendedDefinitionStorage(const ASTPtr &storage_ast);
[[maybe_unused]] ASTPtr extractPartitionKey(const ASTPtr &storage_ast);
[[maybe_unused]] ShardPriority getReplicasPriority(const Cluster::Addresses & replicas, const std::string & local_hostname, UInt8 random);
}

View File

@ -0,0 +1,68 @@
#pragma once
#include "Aliases.h"
namespace DB
{
/// Just destination partition of a shard
struct ShardPartition
{
ShardPartition(TaskShard & parent, const String & name_quoted_) : task_shard(parent), name(name_quoted_) {}
String getPartitionPath() const;
String getPartitionCleanStartPath() const;
String getCommonPartitionIsDirtyPath() const;
String getCommonPartitionIsCleanedPath() const;
String getPartitionActiveWorkersPath() const;
String getActiveWorkerPath() const;
String getPartitionShardsPath() const;
String getShardStatusPath() const;
TaskShard & task_shard;
String name;
};
inline String ShardPartition::getPartitionCleanStartPath() const
{
return getPartitionPath() + "/clean_start";
}
inline String ShardPartition::getPartitionPath() const
{
return task_shard.task_table.getPartitionPath(name);
}
inline String ShardPartition::getShardStatusPath() const
{
// schema: /<root...>/tables/<table>/<partition>/shards/<shard>
// e.g. /root/table_test.hits/201701/shards/1
return getPartitionShardsPath() + "/" + toString(task_shard.numberInCluster());
}
inline String ShardPartition::getPartitionShardsPath() const
{
return getPartitionPath() + "/shards";
}
inline String ShardPartition::getPartitionActiveWorkersPath() const
{
return getPartitionPath() + "/partition_active_workers";
}
inline String ShardPartition::getActiveWorkerPath() const
{
return getPartitionActiveWorkersPath() + "/" + toString(task_shard.numberInCluster());
}
inline String ShardPartition::getCommonPartitionIsDirtyPath() const
{
return getPartitionPath() + "/is_dirty";
}
inline String ShardPartition::getCommonPartitionIsCleanedPath() const
{
return getCommonPartitionIsDirtyPath() + "/cleaned";
}
}

View File

@ -0,0 +1,96 @@
#pragma once
#include "Aliases.h"
namespace DB
{
struct TaskCluster
{
TaskCluster(const String & task_zookeeper_path_, const String & default_local_database_)
: task_zookeeper_path(task_zookeeper_path_), default_local_database(default_local_database_) {}
void loadTasks(const Poco::Util::AbstractConfiguration & config, const String & base_key = "");
/// Set (or update) settings and max_workers param
void reloadSettings(const Poco::Util::AbstractConfiguration & config, const String & base_key = "");
/// Base node for all tasks. Its structure:
/// workers/ - directory with active workers (amount of them is less or equal max_workers)
/// description - node with task configuration
/// table_table1/ - directories with per-partition copying status
String task_zookeeper_path;
/// Database used to create temporary Distributed tables
String default_local_database;
/// Limits number of simultaneous workers
UInt64 max_workers = 0;
/// Base settings for pull and push
Settings settings_common;
/// Settings used to fetch data
Settings settings_pull;
/// Settings used to insert data
Settings settings_push;
String clusters_prefix;
/// Subtasks
TasksTable table_tasks;
std::random_device random_device;
pcg64 random_engine;
};
inline void DB::TaskCluster::loadTasks(const Poco::Util::AbstractConfiguration & config, const String & base_key)
{
String prefix = base_key.empty() ? "" : base_key + ".";
clusters_prefix = prefix + "remote_servers";
if (!config.has(clusters_prefix))
throw Exception("You should specify list of clusters in " + clusters_prefix, ErrorCodes::BAD_ARGUMENTS);
Poco::Util::AbstractConfiguration::Keys tables_keys;
config.keys(prefix + "tables", tables_keys);
for (const auto & table_key : tables_keys)
{
table_tasks.emplace_back(*this, config, prefix + "tables", table_key);
}
}
inline void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfiguration & config, const String & base_key)
{
String prefix = base_key.empty() ? "" : base_key + ".";
max_workers = config.getUInt64(prefix + "max_workers");
settings_common = Settings();
if (config.has(prefix + "settings"))
settings_common.loadSettingsFromConfig(prefix + "settings", config);
settings_pull = settings_common;
if (config.has(prefix + "settings_pull"))
settings_pull.loadSettingsFromConfig(prefix + "settings_pull", config);
settings_push = settings_common;
if (config.has(prefix + "settings_push"))
settings_push.loadSettingsFromConfig(prefix + "settings_push", config);
auto set_default_value = [] (auto && setting, auto && default_value)
{
setting = setting.changed ? setting.value : default_value;
};
/// Override important settings
settings_pull.readonly = 1;
settings_push.insert_distributed_sync = 1;
set_default_value(settings_pull.load_balancing, LoadBalancing::NEAREST_HOSTNAME);
set_default_value(settings_pull.max_threads, 1);
set_default_value(settings_pull.max_block_size, 8192UL);
set_default_value(settings_pull.preferred_block_size_bytes, 0);
set_default_value(settings_push.insert_distributed_timeout, 0);
}
}

View File

@ -0,0 +1,269 @@
#pragma once
#include "Aliases.h"
#include "Internals.h"
#include "ClusterPartition.h"
namespace DB
{
struct TaskShard;
struct TaskTable
{
TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config, const String & prefix,
const String & table_key);
TaskCluster & task_cluster;
String getPartitionPath(const String & partition_name) const;
String getPartitionIsDirtyPath(const String & partition_name) const;
String getPartitionIsCleanedPath(const String & partition_name) const;
String getPartitionTaskStatusPath(const String & partition_name) const;
String name_in_config;
/// Used as task ID
String table_id;
/// Source cluster and table
String cluster_pull_name;
DatabaseAndTableName table_pull;
/// Destination cluster and table
String cluster_push_name;
DatabaseAndTableName table_push;
/// Storage of destination table
String engine_push_str;
ASTPtr engine_push_ast;
ASTPtr engine_push_partition_key_ast;
/// A Distributed table definition used to split data
String sharding_key_str;
ASTPtr sharding_key_ast;
ASTPtr engine_split_ast;
/// Additional WHERE expression to filter input data
String where_condition_str;
ASTPtr where_condition_ast;
/// Resolved clusters
ClusterPtr cluster_pull;
ClusterPtr cluster_push;
/// Filter partitions that should be copied
bool has_enabled_partitions = false;
Strings enabled_partitions;
NameSet enabled_partitions_set;
/// Prioritized list of shards
TasksShard all_shards;
TasksShard local_shards;
ClusterPartitions cluster_partitions;
NameSet finished_cluster_partitions;
/// Parition names to process in user-specified order
Strings ordered_partition_names;
ClusterPartition & getClusterPartition(const String & partition_name)
{
auto it = cluster_partitions.find(partition_name);
if (it == cluster_partitions.end())
throw Exception("There are no cluster partition " + partition_name + " in " + table_id, ErrorCodes::LOGICAL_ERROR);
return it->second;
}
Stopwatch watch;
UInt64 bytes_copied = 0;
UInt64 rows_copied = 0;
template <typename RandomEngine>
void initShards(RandomEngine && random_engine);
};
struct TaskShard {
TaskShard(TaskTable &parent, const ShardInfo &info_) : task_table(parent), info(info_) {}
TaskTable &task_table;
ShardInfo info;
UInt32 numberInCluster() const { return info.shard_num; }
UInt32 indexInCluster() const { return info.shard_num - 1; }
String getDescription() const;
String getHostNameExample() const;
/// Used to sort clusters by their proximity
ShardPriority priority;
/// Column with unique destination partitions (computed from engine_push_partition_key expr.) in the shard
ColumnWithTypeAndName partition_key_column;
/// There is a task for each destination partition
TasksPartition partition_tasks;
/// Which partitions have been checked for existence
/// If some partition from this lists is exists, it is in partition_tasks
std::set<String> checked_partitions;
/// Last CREATE TABLE query of the table of the shard
ASTPtr current_pull_table_create_query;
/// Internal distributed tables
DatabaseAndTableName table_read_shard;
DatabaseAndTableName table_split_shard;
};
inline String TaskTable::getPartitionPath(const String & partition_name) const
{
return task_cluster.task_zookeeper_path // root
+ "/tables/" + table_id // tables/dst_cluster.merge.hits
+ "/" + escapeForFileName(partition_name); // 201701
}
inline String TaskTable::getPartitionIsDirtyPath(const String & partition_name) const
{
return getPartitionPath(partition_name) + "/is_dirty";
}
inline String TaskTable::getPartitionIsCleanedPath(const String & partition_name) const
{
return getPartitionIsDirtyPath(partition_name) + "/cleaned";
}
inline String TaskTable::getPartitionTaskStatusPath(const String & partition_name) const
{
return getPartitionPath(partition_name) + "/shards";
}
inline TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config, const String & prefix_,
const String & table_key)
: task_cluster(parent)
{
String table_prefix = prefix_ + "." + table_key + ".";
name_in_config = table_key;
cluster_pull_name = config.getString(table_prefix + "cluster_pull");
cluster_push_name = config.getString(table_prefix + "cluster_push");
table_pull.first = config.getString(table_prefix + "database_pull");
table_pull.second = config.getString(table_prefix + "table_pull");
table_push.first = config.getString(table_prefix + "database_push");
table_push.second = config.getString(table_prefix + "table_push");
/// Used as node name in ZooKeeper
table_id = escapeForFileName(cluster_push_name)
+ "." + escapeForFileName(table_push.first)
+ "." + escapeForFileName(table_push.second);
engine_push_str = config.getString(table_prefix + "engine");
{
ParserStorage parser_storage;
engine_push_ast = parseQuery(parser_storage, engine_push_str, 0);
engine_push_partition_key_ast = extractPartitionKey(engine_push_ast);
}
sharding_key_str = config.getString(table_prefix + "sharding_key");
{
ParserExpressionWithOptionalAlias parser_expression(false);
sharding_key_ast = parseQuery(parser_expression, sharding_key_str, 0);
engine_split_ast = createASTStorageDistributed(cluster_push_name, table_push.first, table_push.second, sharding_key_ast);
}
where_condition_str = config.getString(table_prefix + "where_condition", "");
if (!where_condition_str.empty())
{
ParserExpressionWithOptionalAlias parser_expression(false);
where_condition_ast = parseQuery(parser_expression, where_condition_str, 0);
// Will use canonical expression form
where_condition_str = queryToString(where_condition_ast);
}
String enabled_partitions_prefix = table_prefix + "enabled_partitions";
has_enabled_partitions = config.has(enabled_partitions_prefix);
if (has_enabled_partitions)
{
Strings keys;
config.keys(enabled_partitions_prefix, keys);
if (keys.empty())
{
/// Parse list of partition from space-separated string
String partitions_str = config.getString(table_prefix + "enabled_partitions");
boost::trim_if(partitions_str, isWhitespaceASCII);
boost::split(enabled_partitions, partitions_str, isWhitespaceASCII, boost::token_compress_on);
}
else
{
/// Parse sequence of <partition>...</partition>
for (const String & key : keys)
{
if (!startsWith(key, "partition"))
throw Exception("Unknown key " + key + " in " + enabled_partitions_prefix, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
enabled_partitions.emplace_back(config.getString(enabled_partitions_prefix + "." + key));
}
}
std::copy(enabled_partitions.begin(), enabled_partitions.end(), std::inserter(enabled_partitions_set, enabled_partitions_set.begin()));
}
}
template<typename RandomEngine>
inline void TaskTable::initShards(RandomEngine &&random_engine) {
const String & fqdn_name = getFQDNOrHostName();
std::uniform_int_distribution<UInt8> get_urand(0, std::numeric_limits<UInt8>::max());
// Compute the priority
for (auto &shard_info : cluster_pull->getShardsInfo()) {
TaskShardPtr task_shard = std::make_shared<TaskShard>(*this, shard_info);
const auto &replicas = cluster_pull->getShardsAddresses().at(task_shard->indexInCluster());
task_shard->priority = getReplicasPriority(replicas, fqdn_name, get_urand(random_engine));
all_shards.emplace_back(task_shard);
}
// Sort by priority
std::sort(all_shards.begin(), all_shards.end(),
[](const TaskShardPtr &lhs, const TaskShardPtr &rhs) {
return ShardPriority::greaterPriority(lhs->priority, rhs->priority);
});
// Cut local shards
auto it_first_remote = std::lower_bound(all_shards.begin(), all_shards.end(), 1,
[](const TaskShardPtr &lhs, UInt8 is_remote) {
return lhs->priority.is_remote < is_remote;
});
local_shards.assign(all_shards.begin(), it_first_remote);
}
inline String DB::TaskShard::getDescription() const
{
std::stringstream ss;
ss << "N" << numberInCluster()
<< " (having a replica " << getHostNameExample()
<< ", pull table " + getQuotedTable(task_table.table_pull)
<< " of cluster " + task_table.cluster_pull_name << ")";
return ss.str();
}
inline String DB::TaskShard::getHostNameExample() const
{
auto &replicas = task_table.cluster_pull->getShardsAddresses().at(indexInCluster());
return replicas.at(0).readableString();
}
}

View File

@ -0,0 +1,224 @@
#pragma once
/** Allows to compare two incremental counters of type UInt32 in presence of possible overflow.
* We assume that we compare values that are not too far away.
* For example, when we increment 0xFFFFFFFF, we get 0. So, 0xFFFFFFFF is less than 0.
*/
class WrappingUInt32
{
public:
UInt32 value;
explicit WrappingUInt32(UInt32 _value)
: value(_value)
{}
bool operator<(const WrappingUInt32 & other) const
{
return value != other.value && *this <= other;
}
bool operator<=(const WrappingUInt32 & other) const
{
const UInt32 HALF = 1 << 31;
return (value <= other.value && other.value - value < HALF)
|| (value > other.value && value - other.value > HALF);
}
bool operator==(const WrappingUInt32 & other) const
{
return value == other.value;
}
};
/** Conforming Zxid definition.
* cf. https://github.com/apache/zookeeper/blob/631d1b284f0edb1c4f6b0fb221bf2428aec71aaa/zookeeper-docs/src/main/resources/markdown/zookeeperInternals.md#guarantees-properties-and-definitions
*
* But it is better to read this: https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html
*
* Actually here is the definition of Zxid.
* Every change to the ZooKeeper state receives a stamp in the form of a zxid (ZooKeeper Transaction Id).
* This exposes the total ordering of all changes to ZooKeeper. Each change will have a unique zxid
* and if zxid1 is smaller than zxid2 then zxid1 happened before zxid2.
*/
class Zxid
{
public:
WrappingUInt32 epoch;
WrappingUInt32 counter;
explicit Zxid(UInt64 _zxid)
: epoch(_zxid >> 32)
, counter(_zxid)
{}
bool operator<=(const Zxid & other) const
{
return (epoch < other.epoch)
|| (epoch == other.epoch && counter <= other.counter);
}
bool operator==(const Zxid & other) const
{
return epoch == other.epoch && counter == other.counter;
}
};
/* When multiple ClusterCopiers discover that the target partition is not empty,
* they will attempt to clean up this partition before proceeding to copying.
*
* Instead of purging is_dirty, the history of cleaning work is preserved and partition hygiene is established
* based on a happens-before relation between the events.
* This relation is encoded by LogicalClock based on the mzxid of the is_dirty ZNode and is_dirty/cleaned.
* The fact of the partition hygiene is encoded by CleanStateClock.
*
* For you to know what mzxid means:
*
* ZooKeeper Stat Structure:
* The Stat structure for each znode in ZooKeeper is made up of the following fields:
*
* -- czxid
* The zxid of the change that caused this znode to be created.
*
* -- mzxid
* The zxid of the change that last modified this znode.
*
* -- ctime
* The time in milliseconds from epoch when this znode was created.
*
* -- mtime
* The time in milliseconds from epoch when this znode was last modified.
*
* -- version
* The number of changes to the data of this znode.
*
* -- cversion
* The number of changes to the children of this znode.
*
* -- aversion
* The number of changes to the ACL of this znode.
*
* -- ephemeralOwner
* The session id of the owner of this znode if the znode is an ephemeral node.
* If it is not an ephemeral node, it will be zero.
*
* -- dataLength
* The length of the data field of this znode.
*
* -- numChildren
* The number of children of this znode.
* */
class LogicalClock
{
public:
std::optional<Zxid> zxid;
LogicalClock() = default;
explicit LogicalClock(UInt64 _zxid)
: zxid(_zxid)
{}
bool hasHappened() const
{
return bool(zxid);
}
/// happens-before relation with a reasonable time bound
bool happensBefore(const LogicalClock & other) const
{
return !zxid
|| (other.zxid && *zxid <= *other.zxid);
}
bool operator<=(const LogicalClock & other) const
{
return happensBefore(other);
}
/// strict equality check
bool operator==(const LogicalClock & other) const
{
return zxid == other.zxid;
}
};
class CleanStateClock
{
public:
LogicalClock discovery_zxid;
std::optional<UInt32> discovery_version;
LogicalClock clean_state_zxid;
std::optional<UInt32> clean_state_version;
std::shared_ptr<std::atomic_bool> stale;
bool is_clean() const
{
return
!is_stale()
&& (
!discovery_zxid.hasHappened()
|| (clean_state_zxid.hasHappened() && discovery_zxid <= clean_state_zxid));
}
bool is_stale() const
{
return stale->load();
}
CleanStateClock(
const zkutil::ZooKeeperPtr & zookeeper,
const String & discovery_path,
const String & clean_state_path)
: stale(std::make_shared<std::atomic_bool>(false))
{
Coordination::Stat stat{};
String _some_data;
auto watch_callback =
[stale = stale] (const Coordination::WatchResponse & rsp)
{
auto logger = &Poco::Logger::get("ClusterCopier");
if (rsp.error == Coordination::ZOK)
{
switch (rsp.type)
{
case Coordination::CREATED:
LOG_DEBUG(logger, "CleanStateClock change: CREATED, at " << rsp.path);
stale->store(true);
break;
case Coordination::CHANGED:
LOG_DEBUG(logger, "CleanStateClock change: CHANGED, at" << rsp.path);
stale->store(true);
}
}
};
if (zookeeper->tryGetWatch(discovery_path, _some_data, &stat, watch_callback))
{
discovery_zxid = LogicalClock(stat.mzxid);
discovery_version = stat.version;
}
if (zookeeper->tryGetWatch(clean_state_path, _some_data, &stat, watch_callback))
{
clean_state_zxid = LogicalClock(stat.mzxid);
clean_state_version = stat.version;
}
}
bool operator==(const CleanStateClock & other) const
{
return !is_stale()
&& !other.is_stale()
&& discovery_zxid == other.discovery_zxid
&& discovery_version == other.discovery_version
&& clean_state_zxid == other.clean_state_zxid
&& clean_state_version == other.clean_state_version;
}
bool operator!=(const CleanStateClock & other) const
{
return !(*this == other);
}
};