ClusterCopier improvements (#1869)

* Fixed race condition in StorageDistributed. [#CLICKHOUSE-2]

* ClusterCopier improvements. [#CLICKHOUSE-3346]

* Add performance metrics. [#CLICKHOUSE-3346]
This commit is contained in:
Vitaliy Lyudvichenko 2018-02-07 16:02:47 +03:00 committed by GitHub
parent 20af4d45a9
commit ef98bff6e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 207 additions and 79 deletions

View File

@ -57,6 +57,7 @@
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Server/StatusFile.h>
#include <Storages/registerStorages.h>
#include <Common/formatReadable.h>
namespace DB
@ -138,13 +139,25 @@ struct TaskPartition;
struct TaskShard;
struct TaskTable;
struct TaskCluster;
struct ClusterPartition;
using TasksPartition = std::map<String, TaskPartition>;
using ShardInfo = Cluster::ShardInfo;
using TaskShardPtr = std::shared_ptr<TaskShard>;
using TasksShard = std::vector<TaskShardPtr>;
using TasksTable = std::list<TaskTable>;
using PartitionToShards = std::map<String, TasksShard>;
using ClusterPartitions = std::map<String, ClusterPartition>;
/// Since we could drop only the whole parition on cluster, set of the same patitions in a cluster is atomic entity
struct ClusterPartition
{
TasksShard shards; /// having that partition
Stopwatch watch;
UInt64 bytes_copied = 0;
UInt64 rows_copied = 0;
};
struct TaskPartition
{
@ -201,9 +214,11 @@ struct TaskTable
String getPartitionPath(const String & partition_name) const;
String getPartitionIsDirtyPath(const String & partition_name) const;
/// Used as task ID
String name_in_config;
/// Used as task ID
String table_id;
/// Source cluster and table
String cluster_pull_name;
DatabaseAndTableName table_pull;
@ -238,7 +253,18 @@ struct TaskTable
TasksShard all_shards;
TasksShard local_shards;
PartitionToShards partition_to_shards;
ClusterPartitions cluster_partitions;
ClusterPartition & getClusterPartition(const String & partition_name)
{
auto it = cluster_partitions.find(partition_name);
if (it == cluster_partitions.end())
throw Exception("There are no cluster partition " + partition_name + " in " + table_id, ErrorCodes::LOGICAL_ERROR);
return it->second;
}
Stopwatch watch;
UInt64 bytes_copied = 0;
UInt64 rows_copied = 0;
template <typename RandomEngine>
void initShards(RandomEngine && random_engine);
@ -257,6 +283,8 @@ struct TaskCluster
/// Limits number of simultaneous workers
size_t max_workers = 0;
/// Base settings for pull and push
Settings settings_common;
/// Settings used to fetch data
Settings settings_pull;
/// Settings used to insert data
@ -346,9 +374,9 @@ Block getBlockWithAllStreamData(const BlockInputStreamPtr & stream)
String TaskTable::getPartitionPath(const String & partition_name) const
{
return task_cluster.task_zookeeper_path // root
+ "/tables/" + escapeForFileName(name_in_config) // tables/table_hits
+ "/" + partition_name; // 201701
return task_cluster.task_zookeeper_path // root
+ "/tables/" + table_id // tables/dst_cluster.merge.hits
+ "/" + partition_name; // 201701
}
String TaskPartition::getPartitionPath() const
@ -405,6 +433,11 @@ TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfigurati
table_push.first = config.getString(table_prefix + "database_push");
table_push.second = config.getString(table_prefix + "table_push");
/// Used as node name in ZooKeeper
table_id = escapeForFileName(cluster_push_name)
+ "." + escapeForFileName(table_push.first)
+ "." + escapeForFileName(table_push.second);
engine_push_str = config.getString(table_prefix + "engine");
{
ParserStorage parser_storage;
@ -417,7 +450,7 @@ TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfigurati
sharding_key_ast = parseQuery(parser_expression, sharding_key_str);
engine_split_ast = createASTStorageDistributed(cluster_push_name, table_push.first, table_push.second, sharding_key_ast);
table_split = DatabaseAndTableName(task_cluster.default_local_database, ".split." + name_in_config);
table_split = DatabaseAndTableName(task_cluster.default_local_database, ".split." + table_id);
}
where_condition_str = config.getString(table_prefix + "where_condition", "");
@ -538,8 +571,9 @@ TaskCluster::TaskCluster(const String & task_zookeeper_path_, const Poco::Util::
if (config.has(prefix + "settings"))
{
settings_pull.loadSettingsFromConfig(prefix + "settings", config);
settings_push.loadSettingsFromConfig(prefix + "settings", config);
settings_common.loadSettingsFromConfig(prefix + "settings", config);
settings_pull = settings_common;
settings_push = settings_common;
}
if (config.has(prefix + "settings_pull"))
@ -606,6 +640,7 @@ public:
settings_push.insert_distributed_sync = 1;
/// Set up clusters
context.getSettingsRef() = task_cluster->settings_common;
context.setClustersConfig(task_cluster_config, task_cluster->clusters_prefix);
/// Set up shards and their priority
@ -630,7 +665,7 @@ public:
ErrorCodes::LOGICAL_ERROR);
}
LOG_DEBUG(log, "Set up table task " << task_table.name_in_config << " ("
LOG_DEBUG(log, "Set up table task " << task_table.table_id << " (pull from "
<< "cluster " << task_table.cluster_pull_name
<< ", table " << getDatabaseDotTable(task_table.table_pull)
<< ", shard " << task_shard->info.shard_num << ")");
@ -654,7 +689,9 @@ public:
}
task_shard->partitions.emplace(partition_name, TaskPartition(*task_shard, partition_name));
task_table.partition_to_shards[partition_name].emplace_back(task_shard);
ClusterPartition & cluster_partition = task_table.cluster_partitions[partition_name];
cluster_partition.shards.emplace_back(task_shard);
}
LOG_DEBUG(log, "Will fetch " << task_shard->partitions.size() << " partitions");
@ -672,6 +709,8 @@ public:
if (task_table.all_shards.empty())
continue;
task_table.watch.restart();
/// An optimization: first of all, try to process all partitions of the local shards
// for (const TaskShardPtr & shard : task_table.local_shards)
// {
@ -683,17 +722,18 @@ public:
// }
/// Then check and copy all shards until the whole partition is copied
for (const auto & partition_with_shards : task_table.partition_to_shards)
for (auto & elem : task_table.cluster_partitions)
{
const String & partition_name = partition_with_shards.first;
const TasksShard & shards_with_partition = partition_with_shards.second;
bool is_done;
const String & partition_name = elem.first;
ClusterPartition & cluster_partition = elem.second;
const TasksShard & shards_with_partition = cluster_partition.shards;
cluster_partition.watch.restart();
bool is_done = false;
size_t num_tries = 0;
constexpr size_t max_tries = 1000;
Stopwatch watch;
do
{
LOG_DEBUG(log, "Processing partition " << partition_name << " for the whole cluster"
@ -729,10 +769,33 @@ public:
++num_tries;
} while (!is_done && num_tries < max_tries);
if (!is_done)
throw Exception("Too many retries while copying partition", ErrorCodes::UNFINISHED);
if (is_done)
{
task_table.bytes_copied += cluster_partition.bytes_copied;
task_table.rows_copied += cluster_partition.rows_copied;
double elapsed = cluster_partition.watch.elapsedSeconds();
LOG_INFO(log, "It took " << std::setprecision(2) << elapsed << " seconds to copy partition " << partition_name
<< ": " << formatReadableSizeWithDecimalSuffix(cluster_partition.bytes_copied) << " uncompressed bytes"
<< " and " << formatReadableQuantity(cluster_partition.rows_copied) << " rows are copied");
if (cluster_partition.rows_copied)
{
LOG_INFO(log, "Average partition speed: "
<< formatReadableSizeWithDecimalSuffix(cluster_partition.bytes_copied / elapsed) << " per second.");
}
if (task_table.rows_copied)
{
LOG_INFO(log, "Average table " << task_table.table_id << " speed: "
<< formatReadableSizeWithDecimalSuffix(task_table.bytes_copied / elapsed) << " per second.");
}
}
else
LOG_INFO(log, "It took " << watch.elapsedSeconds() << " seconds to copy partition " << partition_name);
{
throw Exception("Too many retries while copying partition " + partition_name + ". Try the next one",
ErrorCodes::UNFINISHED);
}
}
}
}
@ -851,10 +914,36 @@ protected:
}
}
std::shared_ptr<ASTCreateQuery> rewriteCreateQueryStorage(const ASTPtr & create_query_pull, const DatabaseAndTableName & new_table,
const ASTPtr & new_storage_ast)
static ASTPtr removeAliasColumnsFromCreateQuery(const ASTPtr & query_ast)
{
auto & create = typeid_cast<ASTCreateQuery &>(*create_query_pull);
const ASTs & column_asts = typeid_cast<ASTCreateQuery &>(*query_ast).columns->children;
auto new_columns = std::make_shared<ASTExpressionList>();
for (const ASTPtr & column_ast : column_asts)
{
const ASTColumnDeclaration & column = typeid_cast<const ASTColumnDeclaration &>(*column_ast);
if (!column.default_specifier.empty())
{
ColumnDefaultType type = columnDefaultTypeFromString(column.default_specifier);
if (type == ColumnDefaultType::Materialized || type == ColumnDefaultType::Alias)
continue;
}
new_columns->children.emplace_back(column_ast->clone());
}
ASTPtr new_query_ast = query_ast->clone();
ASTCreateQuery & new_query = typeid_cast<ASTCreateQuery &>(*new_query_ast);
new_query.columns = new_columns.get();
new_query.children.at(0) = std::move(new_columns);
return new_query_ast;
}
std::shared_ptr<ASTCreateQuery> rewriteCreateQueryStorage(const ASTPtr & create_query_ast, const DatabaseAndTableName & new_table, const ASTPtr & new_storage_ast)
{
ASTCreateQuery & create = typeid_cast<ASTCreateQuery &>(*create_query_ast);
auto res = std::make_shared<ASTCreateQuery>(create);
if (create.storage == nullptr || new_storage_ast == nullptr)
@ -927,7 +1016,7 @@ protected:
LOG_DEBUG(log, "Execute distributed DROP PARTITION: " << query);
/// Limit number of max executing replicas to 1
size_t num_shards = executeQueryOnCluster(cluster_push, query, nullptr, &settings_push, PoolMode::GET_ALL, 1);
size_t num_shards = executeQueryOnCluster(cluster_push, query, nullptr, &settings_push, PoolMode::GET_ONE, 1);
if (num_shards < cluster_push->getShardCount())
{
@ -960,6 +1049,7 @@ protected:
{
TaskShard & task_shard = task_partition.task_shard;
TaskTable & task_table = task_shard.task_table;
ClusterPartition & cluster_partition = task_table.getClusterPartition(task_partition.name);
auto zookeeper = getZooKeeper();
auto acl = zookeeper->getDefaultACL();
@ -1069,8 +1159,8 @@ protected:
/// Create local Distributed tables:
/// a table fetching data from current shard and a table inserting data to the whole destination cluster
DatabaseAndTableName table_shard(working_database_name, ".read_shard." + task_table.name_in_config);
DatabaseAndTableName table_split(working_database_name, ".split." + task_table.name_in_config);
DatabaseAndTableName table_shard(working_database_name, ".read_shard." + task_table.table_id);
DatabaseAndTableName table_split(working_database_name, ".split." + task_table.table_id);
{
/// Create special cluster with single shard
String shard_read_cluster_name = ".read_shard." + task_table.cluster_pull_name;
@ -1080,8 +1170,9 @@ protected:
auto storage_shard_ast = createASTStorageDistributed(shard_read_cluster_name, task_table.table_pull.first, task_table.table_pull.second);
const auto & storage_split_ast = task_table.engine_split_ast;
auto create_table_pull_ast = rewriteCreateQueryStorage(create_query_pull_ast, table_shard, storage_shard_ast);
auto create_table_split_ast = rewriteCreateQueryStorage(create_query_pull_ast, table_split, storage_split_ast);
auto create_query_ast = removeAliasColumnsFromCreateQuery(create_query_pull_ast);
auto create_table_pull_ast = rewriteCreateQueryStorage(create_query_ast, table_shard, storage_shard_ast);
auto create_table_split_ast = rewriteCreateQueryStorage(create_query_ast, table_split, storage_split_ast);
//LOG_DEBUG(log, "Create shard reading table. Query: " << queryToString(create_table_pull_ast));
dropAndCreateLocalTable(create_table_pull_ast);
@ -1152,7 +1243,7 @@ protected:
String query = queryToString(create_query_push_ast);
LOG_DEBUG(log, "Create remote push tables. Query: " << query);
executeQueryOnCluster(task_table.cluster_push, query, create_query_push_ast, &task_cluster->settings_push);
executeQueryOnCluster(task_table.cluster_push, query, create_query_push_ast, &task_cluster->settings_push, PoolMode::GET_MANY);
}
/// Do the copying
@ -1239,6 +1330,19 @@ protected:
return false;
};
/// Update statistics
/// It is quite rough: bytes_copied don't take into account DROP PARTITION.
if (auto in = dynamic_cast<IProfilingBlockInputStream *>(io_select.in.get()))
{
auto update_table_stats = [&] (const Progress & progress)
{
cluster_partition.bytes_copied += progress.bytes;
cluster_partition.rows_copied += progress.rows;
};
in->setProgressCallback(update_table_stats);
}
/// Main work is here
copyData(*io_select.in, *io_insert.out, cancel_check);
@ -1381,8 +1485,12 @@ protected:
return max_successful_executions_per_shard && num_successful_executions >= max_successful_executions_per_shard;
};
size_t num_replicas = cluster->getShardsAddresses().at(shard_index).size();
size_t num_local_replicas = shard.getLocalNodeCount();
size_t num_remote_replicas = num_replicas - num_local_replicas;
/// In that case we don't have local replicas, but do it just in case
for (size_t i = 0; i < shard.getLocalNodeCount(); ++i)
for (size_t i = 0; i < num_local_replicas; ++i)
{
auto interpreter = InterpreterFactory::get(query_ast, context);
interpreter->execute();
@ -1394,7 +1502,10 @@ protected:
/// Will try to make as many as possible queries
if (shard.hasRemoteConnections())
{
std::vector<IConnectionPool::Entry> connections = shard.pool->getMany(settings, pool_mode);
Settings current_settings = *settings;
current_settings.max_parallel_replicas = num_remote_replicas ? num_remote_replicas : 1;
std::vector<IConnectionPool::Entry> connections = shard.pool->getMany(&current_settings, pool_mode);
for (auto & connection : connections)
{
@ -1402,7 +1513,7 @@ protected:
{
try
{
RemoteBlockInputStream stream(*connection, query, context, settings);
RemoteBlockInputStream stream(*connection, query, context, &current_settings);
NullBlockOutputStream output;
copyData(stream, output);
@ -1473,7 +1584,7 @@ private:
void ClusterCopierApp::initialize(Poco::Util::Application & self)
{
Poco::Util::Application::initialize(self);
Poco::Util::ServerApplication::initialize(self);
is_help = config().has("help");
if (is_help)
@ -1487,8 +1598,11 @@ void ClusterCopierApp::initialize(Poco::Util::Application & self)
copy_fault_probability = std::max(std::min(config().getDouble("copy-fault-probability"), 1.0), 0.0);
base_dir = (config().has("base-dir")) ? config().getString("base-dir") : Poco::Path::current();
// process_id is '<hostname>#<pid>_<start_timestamp>'
process_id = std::to_string(Poco::Process::id()) + "_" + std::to_string(Poco::Timestamp().epochTime());
// process_id is '<hostname>#<start_timestamp>_<pid>'
time_t timestamp = Poco::Timestamp().epochTime();
auto pid = Poco::Process::id();
process_id = std::to_string(DateLUT::instance().toNumYYYYMMDDhhmmss(timestamp)) + "_" + std::to_string(pid);
host_id = escapeForFileName(getFQDNOrHostName()) + '#' + process_id;
process_path = Poco::Path(base_dir + "/clickhouse-copier_" + process_id).absolute().toString();
Poco::File(process_path).createDirectories();
@ -1515,6 +1629,8 @@ void ClusterCopierApp::handleHelp(const std::string &, const std::string &)
void ClusterCopierApp::defineOptions(Poco::Util::OptionSet & options)
{
Poco::Util::ServerApplication::defineOptions(options);
options.addOption(Poco::Util::Option("config-file", "c", "path to config file with ZooKeeper config", true)
.argument("config-file").binding("config-file"));
options.addOption(Poco::Util::Option("task-path", "", "path to task in ZooKeeper")
@ -1543,7 +1659,7 @@ void ClusterCopierApp::setupLogging()
split_channel->addChannel(log_file_channel);
log_file_channel->open();
if (!config().getBool("application.runAsService", true))
if (!config().getBool("application.runAsDaemon", true))
{
Poco::AutoPtr<Poco::ConsoleChannel> console_channel(new Poco::ConsoleChannel);
split_channel->addChannel(console_channel);
@ -1551,7 +1667,7 @@ void ClusterCopierApp::setupLogging()
}
Poco::AutoPtr<Poco::PatternFormatter> formatter(new Poco::PatternFormatter);
formatter->setProperty("pattern", "%L%Y-%m-%d %H:%M:%S.%i <%p> %s: %t");
formatter->setProperty("pattern", "%L%Y-%m-%d %H:%M:%S.%i [ %I ] <%p> %s: %t");
Poco::AutoPtr<Poco::FormattingChannel> formatting_channel(new Poco::FormattingChannel(formatter));
formatting_channel->setChannel(split_channel);
split_channel->open();

View File

@ -68,9 +68,11 @@
<readonly>0</readonly>
</settings_push>
<!-- Common setting for fetch (pull) and insert (push) operations.
They are overlaid by <settings_pull/> and <settings_push/> respectively -->
<!-- Common setting for fetch (pull) and insert (push) operations. Also, copier process context uses it.
They are overlaid by <settings_pull/> and <settings_push/> respectively. -->
<settings>
<connect_timeout>3</connect_timeout>
<!-- Sync insert is set forcibly, leave it here just in case. -->
<insert_distributed_sync>1</insert_distributed_sync>
</settings>

View File

@ -325,11 +325,13 @@ void StorageDistributed::createDirectoryMonitors()
void StorageDistributed::requireDirectoryMonitor(const std::string & name)
{
std::lock_guard lock(cluster_nodes_mutex);
cluster_nodes_data[name].requireDirectoryMonitor(name, *this);
}
ConnectionPoolPtr StorageDistributed::requireConnectionPool(const std::string & name)
{
std::lock_guard lock(cluster_nodes_mutex);
auto & node_data = cluster_nodes_data[name];
node_data.requireConnectionPool(name, *this);
return node_data.conneciton_pool;

View File

@ -122,6 +122,7 @@ public:
void requireDirectoryMonitor(const std::string & name, StorageDistributed & storage);
};
std::unordered_map<std::string, ClusterNodeData> cluster_nodes_data;
std::mutex cluster_nodes_mutex;
/// Used for global monotonic ordering of files to send.
SimpleIncrement file_names_increment;

View File

@ -1,5 +1,44 @@
<?xml version="1.0"?>
<yandex>
<!-- How many simualteneous workers are posssible -->
<max_workers>4</max_workers>
<!-- Common setting for pull and push operations -->
<settings>
<connect_timeout>1</connect_timeout>
</settings>
<!-- Setting used to fetch data -->
<settings_pull>
</settings_pull>
<!-- Setting used to insert data -->
<settings_push>
</settings_push>
<!-- Tasks -->
<tables>
<hits>
<cluster_pull>cluster0</cluster_pull>
<database_pull>default</database_pull>
<table_pull>hits</table_pull>
<cluster_push>cluster1</cluster_push>
<database_push>default</database_push>
<table_push>hits</table_push>
<enabled_partitions> 0 1 2</enabled_partitions>
<!-- Engine of destination tables -->
<engine>ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster{cluster}/{shard}', '{replica}') PARTITION BY d % 3 ORDER BY d SETTINGS index_granularity = 16</engine>
<!-- Which sarding key to use while copying -->
<sharding_key>d + 1</sharding_key>
<!-- Optional expression that filter copying data -->
<where_condition>d - d = 0</where_condition>
</hits>
</tables>
<!-- Configuration of clusters -->
<remote_servers>
@ -42,47 +81,13 @@
<host>s1_1_0</host>
<port>9000</port>
</replica>
<replica>
<!-- Died replica -->
<host>255.255.255.255</host>
<port>9000</port>
</replica>
</shard>
</cluster1>
</remote_servers>
<!-- How many simualteneous workers are posssible -->
<max_workers>4</max_workers>
<!-- Common setting for pull and push operations -->
<settings>
</settings>
<!-- Setting used to fetch data -->
<settings_pull>
</settings_pull>
<!-- Setting used to insert data -->
<settings_push>
</settings_push>
<!-- Tasks -->
<tables>
<hits>
<cluster_pull>cluster0</cluster_pull>
<database_pull>default</database_pull>
<table_pull>hits</table_pull>
<cluster_push>cluster1</cluster_push>
<database_push>default</database_push>
<table_push>hits</table_push>
<enabled_partitions> 0 1 2</enabled_partitions>
<!-- Engine of destination tables -->
<engine>ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster{cluster}/{shard}', '{replica}') PARTITION BY d % 3 ORDER BY d SETTINGS index_granularity = 16</engine>
<!-- Which sarding key to use while copying -->
<sharding_key>d + 1</sharding_key>
<!-- Optional expression that filter copying data -->
<where_condition>d - d = 0</where_condition>
</hits>
</tables>
</yandex>

View File

@ -74,8 +74,10 @@ def _test_copying(cmd_options):
ddl_check_query(instance, "DROP TABLE IF EXISTS hits ON CLUSTER cluster0")
ddl_check_query(instance, "DROP TABLE IF EXISTS hits ON CLUSTER cluster1")
ddl_check_query(instance, "DROP TABLE IF EXISTS hits_all ON CLUSTER cluster0")
ddl_check_query(instance, "DROP TABLE IF EXISTS hits_all ON CLUSTER cluster1")
ddl_check_query(instance, "CREATE TABLE hits ON CLUSTER cluster0 (d UInt64) ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster_{cluster}/{shard}', '{replica}') PARTITION BY d % 3 ORDER BY d SETTINGS index_granularity = 16")
ddl_check_query(instance, "CREATE TABLE hits ON CLUSTER cluster0 (d UInt64, d1 UInt64 MATERIALIZED d+1) ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster_{cluster}/{shard}', '{replica}') PARTITION BY d % 3 ORDER BY d SETTINGS index_granularity = 16")
ddl_check_query(instance, "CREATE TABLE hits_all ON CLUSTER cluster0 (d UInt64) ENGINE=Distributed(cluster0, default, hits, d)")
ddl_check_query(instance, "CREATE TABLE hits_all ON CLUSTER cluster1 (d UInt64) ENGINE=Distributed(cluster1, default, hits, d + 1)")
instance.query("INSERT INTO hits_all SELECT * FROM system.numbers LIMIT 1002")