Add support of any partition key. [#CLICKHOUSE-3606]

This commit is contained in:
Vitaliy Lyudvichenko 2018-02-21 00:03:38 +03:00
parent 585b80acf5
commit fbe4066c15
5 changed files with 456 additions and 172 deletions

View File

@ -46,6 +46,7 @@
#include <Parsers/queryToString.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTExpressionList.h>
#include <Databases/DatabaseMemory.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
@ -88,8 +89,20 @@ static ConfigurationPtr getConfigurationFromXMLString(const std::string & xml_da
using DatabaseAndTableName = std::pair<String, String>;
String getDatabaseDotTable(const String & database, const String & table)
return backQuoteIfNeed(database) + "." + backQuoteIfNeed(table);
String getDatabaseDotTable(const DatabaseAndTableName & db_and_table)
return getDatabaseDotTable(db_and_table.first, db_and_table.second);
enum class TaskState
@ -138,35 +151,24 @@ struct TaskStateWithOwner
/// Hierarchical description of the tasks
struct TaskPartition;
struct ShardPartition;
struct TaskShard;
struct TaskTable;
struct TaskCluster;
struct ClusterPartition;
using TasksPartition = std::map<String, TaskPartition>;
using TasksPartition = std::map<String, ShardPartition>;
using ShardInfo = Cluster::ShardInfo;
using TaskShardPtr = std::shared_ptr<TaskShard>;
using TasksShard = std::vector<TaskShardPtr>;
using TasksTable = std::list<TaskTable>;
using ClusterPartitions = std::map<String, ClusterPartition>;
/// Contains all cluster shards (sorted by neighborhood) containig a partition
struct ClusterPartition
/// Just destination partition of a shard
struct ShardPartition
TasksShard shards; /// having that partition
Stopwatch watch;
UInt64 bytes_copied = 0;
UInt64 rows_copied = 0;
size_t total_tries = 0;
struct TaskPartition
TaskPartition(TaskShard & parent, const String & name_) : task_shard(parent), name(name_) {}
ShardPartition(TaskShard & parent, const String & name_quoted_) : task_shard(parent), name(name_quoted_) {}
String getPartitionPath() const;
String getCommonPartitionIsDirtyPath() const;
@ -204,11 +206,39 @@ struct TaskShard
UInt32 numberInCluster() const { return info.shard_num; }
UInt32 indexInCluster() const { return info.shard_num - 1; }
TasksPartition partitions;
String getDescription() const;
/// Used to sort clusters by thier proximity
ShardPriority priority;
/// Column with unique destination partitions (computed from engine_push_partition_key expr.) in the shard
ColumnWithTypeAndName partition_key_column;
/// There is a task for each destination partition
TasksPartition partition_tasks;
/// Last CREATE TABLE query of the table of the shard
ASTPtr current_pull_table_create_query;
/// Internal distributed tables
DatabaseAndTableName table_read_shard;
DatabaseAndTableName table_split_shard;
/// Contains all cluster shards that contain a partition (and sorted by the proximity)
struct ClusterPartition
TasksShard shards; /// having that partition
Stopwatch watch;
UInt64 bytes_copied = 0;
UInt64 rows_copied = 0;
size_t total_tries = 0;
struct TaskTable
TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config, const String & prefix,
@ -235,6 +265,7 @@ struct TaskTable
/// Storage of destination table
String engine_push_str;
ASTPtr engine_push_ast;
ASTPtr engine_push_partition_key_ast;
/// Local Distributed table used to split data
DatabaseAndTableName table_split;
@ -252,7 +283,8 @@ struct TaskTable
/// Filter partitions that should be copied
bool has_enabled_partitions = false;
NameSet enabled_partitions;
Strings enabled_partitions;
NameSet enabled_partitions_set;
/// Prioritized list of shards
TasksShard all_shards;
@ -277,6 +309,7 @@ struct TaskTable
void initShards(RandomEngine && random_engine);
struct TaskCluster
TaskCluster(const String & task_zookeeper_path_, const String & default_local_database_)
@ -284,6 +317,7 @@ struct TaskCluster
void loadTasks(const Poco::Util::AbstractConfiguration & config, const String & base_key = "");
/// Set (or update) settings and max_workers param
void reloadSettings(const Poco::Util::AbstractConfiguration & config, const String & base_key = "");
/// Base node for all tasks. Its structure:
@ -315,17 +349,6 @@ struct TaskCluster
String getDatabaseDotTable(const String & database, const String & table)
return backQuoteIfNeed(database) + "." + backQuoteIfNeed(table);
String getDatabaseDotTable(const DatabaseAndTableName & db_and_table)
return getDatabaseDotTable(db_and_table.first, db_and_table.second);
/// Atomically checks that is_dirty node is not exists, and made the remaining op
/// Returns relative number of failed operation in the second field (the passed op has 0 index)
static void checkNoNodeAndCommit(
@ -381,42 +404,43 @@ Block getBlockWithAllStreamData(const BlockInputStreamPtr & stream)
return squashStreamIntoOneBlock(stream)->read();
// Path getters
/// Path getters
String TaskTable::getPartitionPath(const String & partition_name) const
return task_cluster.task_zookeeper_path // root
+ "/tables/" + table_id // tables/dst_cluster.merge.hits
+ "/" + partition_name; // 201701
return task_cluster.task_zookeeper_path // root
+ "/tables/" + table_id // tables/dst_cluster.merge.hits
+ "/" + escapeForFileName(partition_name); // 201701
String TaskPartition::getPartitionPath() const
String ShardPartition::getPartitionPath() const
return task_shard.task_table.getPartitionPath(name);
String TaskPartition::getShardStatusPath() const
String ShardPartition::getShardStatusPath() const
// /root/table_test.hits/201701/1
return getPartitionPath() + "/shards/" + toString(task_shard.numberInCluster());
String TaskPartition::getPartitionShardsPath() const
String ShardPartition::getPartitionShardsPath() const
return getPartitionPath() + "/shards";
String TaskPartition::getPartitionActiveWorkersPath() const
String ShardPartition::getPartitionActiveWorkersPath() const
return getPartitionPath() + "/partition_active_workers";
String TaskPartition::getActiveWorkerPath() const
String ShardPartition::getActiveWorkerPath() const
return getPartitionActiveWorkersPath() + "/" + toString(task_shard.numberInCluster());
String TaskPartition::getCommonPartitionIsDirtyPath() const
String ShardPartition::getCommonPartitionIsDirtyPath() const
return getPartitionPath() + "/is_dirty";
@ -426,6 +450,58 @@ String TaskTable::getPartitionIsDirtyPath(const String & partition_name) const
return getPartitionPath(partition_name) + "/is_dirty";
String DB::TaskShard::getDescription() const
return "" + toString(numberInCluster())
+ " of pull table " + getDatabaseDotTable(task_table.table_pull)
+ " of cluster " + task_table.cluster_pull_name;
static bool isExtedndedDefinitionStorage(const ASTPtr & storage_ast)
const ASTStorage & storage = typeid_cast<const ASTStorage &>(*storage_ast);
return storage.partition_by || storage.order_by || storage.sample_by;
static ASTPtr extractPartitionKey(const ASTPtr & storage_ast)
String storage_str = queryToString(storage_ast);
const ASTStorage & storage = typeid_cast<const ASTStorage &>(*storage_ast);
const ASTFunction & engine = typeid_cast<const ASTFunction &>(*storage.engine);
if (!endsWith(, "MergeTree"))
throw Exception("Unsupported engine was specified in " + storage_str + ", only *MergeTree engines are supported",
ASTPtr arguments_ast = engine.arguments->clone();
ASTs & arguments = typeid_cast<ASTExpressionList &>(*arguments_ast).children;
if (isExtedndedDefinitionStorage(storage_ast))
if (storage.partition_by)
return storage.partition_by->clone();
static const char * all = "all";
return std::make_shared<ASTLiteral>(StringRange(all, all + strlen(all)), Field(all, strlen(all)));
bool is_replicated = startsWith(, "Replicated");
size_t min_args = is_replicated ? 3 : 1;
if (arguments.size() < min_args)
throw Exception("Expected at least " + toString(min_args) + " arguments in " + storage_str, ErrorCodes::BAD_ARGUMENTS);
ASTPtr & month_arg = is_replicated ? arguments[2] : arguments[1];
return makeASTFunction("toYYYYMM", month_arg->clone());
TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config, const String & prefix_,
const String & table_key)
@ -453,6 +529,7 @@ TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfigurati
ParserStorage parser_storage;
engine_push_ast = parseQuery(parser_storage, engine_push_str);
engine_push_partition_key_ast = extractPartitionKey(engine_push_ast);
sharding_key_str = config.getString(table_prefix + "sharding_key");
@ -482,13 +559,12 @@ TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfigurati
Strings keys;
config.keys(enabled_partitions_prefix, keys);
Strings partitions;
if (keys.empty())
/// Parse list of partition from space-separated string
String partitions_str = config.getString(table_prefix + "enabled_partitions");
boost::trim_if(partitions_str, isWhitespaceASCII);
boost::split(partitions, partitions_str, isWhitespaceASCII, boost::token_compress_on);
boost::split(enabled_partitions, partitions_str, isWhitespaceASCII, boost::token_compress_on);
@ -498,13 +574,12 @@ TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfigurati
if (!startsWith(key, "partition"))
throw Exception("Unknown key " + key + " in " + enabled_partitions_prefix, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
partitions.emplace_back(config.getString(enabled_partitions_prefix + "." + key));
enabled_partitions.emplace_back(config.getString(enabled_partitions_prefix + "." + key));
std::copy(partitions.begin(), partitions.end(), std::inserter(enabled_partitions, enabled_partitions.begin()));
std::copy(enabled_partitions.begin(), enabled_partitions.end(), std::inserter(enabled_partitions_set, enabled_partitions_set.begin()));
@ -670,9 +745,11 @@ public:
LOG_DEBUG(log, "Loaded " << task_cluster->table_tasks.size() << " table tasks");
/// Compute set of partitions, set of partitions aren't changed
/// Compute set of partitions, assume set of partitions aren't changed during the processing
for (auto & task_table : task_cluster->table_tasks)
LOG_DEBUG(log, "Set up table task " << task_table.table_id);
for (const TaskShardPtr & task_shard : task_table.all_shards)
if (task_shard->info.pool == nullptr)
@ -681,36 +758,72 @@ public:
LOG_DEBUG(log, "Set up table task " << task_table.table_id << " (pull from "
<< "cluster " << task_table.cluster_pull_name
<< ", table " << getDatabaseDotTable(task_table.table_pull)
<< ", shard " << task_shard->info.shard_num << ")");
LOG_DEBUG(log, "Set up shard " << task_shard->getDescription());
LOG_DEBUG(log, "There are " << task_table.all_shards.size() << " shards, " << task_table.local_shards.size() << " of them are local ones");
LOG_DEBUG(log, "There are "
<< task_table.all_shards.size() << " shards, "
<< task_table.local_shards.size() << " of them are remote ones");
auto existing_partitions_names = getShardPartitions(*task_shard);
Strings filtered_partitions_names;
auto connection_entry = task_shard->info.pool->get(&task_cluster->settings_pull);
LOG_DEBUG(log, "Will get meta information for shard " << task_shard->numberInCluster()
<< " from replica " << connection_entry->getDescription());
Strings partitions = getRemotePartitions(task_table.table_pull, *connection_entry, &task_cluster->settings_pull);
for (const String & partition_name : partitions)
/// Check that user specified correct partition names
auto check_partition_format = [&] (const String & partition_text_quoted)
/// Do not process partition if it is not in enabled_partitions list
if (task_table.has_enabled_partitions && !task_table.enabled_partitions.count(partition_name))
const DataTypePtr & type = task_shard->partition_key_column.type;
MutableColumnPtr column_dummy = type->createColumn();
ReadBufferFromString rb(partition_text_quoted);
LOG_DEBUG(log, "Will skip partition " << partition_name);
type->deserializeTextQuoted(*column_dummy, rb);
catch (Exception & e)
throw Exception("Partition " + partition_text_quoted + " has incorrect format. " + e.displayText(), ErrorCodes::BAD_ARGUMENTS);
if (task_table.has_enabled_partitions)
/// Process partition in order specified by <enabled_partitions/>
for (const String & partition_name : task_table.enabled_partitions)
auto it = existing_partitions_names.find(partition_name);
/// Do not process partition if it is not in enabled_partitions list
if (it == existing_partitions_names.end())
LOG_WARNING(log, "There is no enabled " << partition_name << " specified in enabled_partitions in shard "
<< task_shard->getDescription());
task_shard->partitions.emplace(partition_name, TaskPartition(*task_shard, partition_name));
for (const String & partition_name : existing_partitions_names)
if (!task_table.enabled_partitions_set.count(partition_name))
LOG_DEBUG(log, "Partition " << partition_name << " will not be processed, since it is not in "
<< "enabled_partitions of " << task_table.table_id);
for (const String & partition_name : existing_partitions_names)
for (const String & partition_name : filtered_partitions_names)
task_shard->partition_tasks.emplace(partition_name, ShardPartition(*task_shard, partition_name));
ClusterPartition & cluster_partition = task_table.cluster_partitions[partition_name];
LOG_DEBUG(log, "Will fetch " << task_shard->partitions.size() << " partitions");
LOG_DEBUG(log, "Will copy " << task_shard->partition_tasks.size() << " partitions from shard " << task_shard->getDescription());
@ -786,11 +899,11 @@ public:
/// NOTE: shards are sorted by "distance" to current host
for (const TaskShardPtr & shard : shards_with_partition)
auto it_shard_partition = shard->partitions.find(partition_name);
if (it_shard_partition == shard->partitions.end())
auto it_shard_partition = shard->partition_tasks.find(partition_name);
if (it_shard_partition == shard->partition_tasks.end())
throw Exception("There are no such partition in a shard. This is a bug.", ErrorCodes::LOGICAL_ERROR);
TaskPartition & task_shard_partition = it_shard_partition->second;
ShardPartition & task_shard_partition = it_shard_partition->second;
if (processPartitionTask(task_shard_partition))
@ -905,7 +1018,7 @@ public:
Strings status_paths;
for (auto & shard : shards_with_partition)
TaskPartition & task_shard_partition = shard->partitions.find(partition_name)->second;
ShardPartition & task_shard_partition = shard->partition_tasks.find(partition_name)->second;
@ -998,6 +1111,7 @@ protected:
/// Removes MATERIALIZED and ALIAS columns from create table query
static ASTPtr removeAliasColumnsFromCreateQuery(const ASTPtr & query_ast)
const ASTs & column_asts = typeid_cast<ASTCreateQuery &>(*query_ast).columns->children;
@ -1025,6 +1139,7 @@ protected:
return new_query_ast;
/// Replaces ENGINE and table name in a create query
std::shared_ptr<ASTCreateQuery> rewriteCreateQueryStorage(const ASTPtr & create_query_ast, const DatabaseAndTableName & new_table, const ASTPtr & new_storage_ast)
ASTCreateQuery & create = typeid_cast<ASTCreateQuery &>(*create_query_ast);
@ -1043,7 +1158,7 @@ protected:
return res;
bool tryDropPartition(TaskPartition & task_partition, const zkutil::ZooKeeperPtr & zookeeper)
bool tryDropPartition(ShardPartition & task_partition, const zkutil::ZooKeeperPtr & zookeeper)
if (is_safe_mode)
throw Exception("DROP PARTITION is prohibited in safe mode", ErrorCodes::NOT_IMPLEMENTED);
@ -1117,7 +1232,7 @@ protected:
bool processPartitionTask(TaskPartition & task_partition)
bool processPartitionTask(ShardPartition & task_partition)
bool res;
@ -1144,7 +1259,7 @@ protected:
return res;
bool processPartitionTaskImpl(TaskPartition & task_partition)
bool processPartitionTaskImpl(ShardPartition & task_partition)
TaskShard & task_shard = task_partition.task_shard;
TaskTable & task_table = task_shard.task_table;
@ -1172,7 +1287,7 @@ protected:
String query;
query += "SELECT " + fields + " FROM " + getDatabaseDotTable(from_table);
query += " WHERE (_part LIKE '" + + "%')";
query += " WHERE (" + queryToString(task_table.engine_push_partition_key_ast) + " = " + + ")";
if (!task_table.where_condition_str.empty())
query += " AND (" + task_table.where_condition_str + ")";
if (!limit.empty())
@ -1245,45 +1360,13 @@ protected:
/// We need to update table definitions for each part, it could be changed after ALTER
ASTPtr query_create_pull_table;
/// Fetch and parse (possibly) new definition
auto connection_entry =>get(&task_cluster->settings_pull);
String create_query_pull_str = getRemoteCreateTable(task_table.table_pull, *connection_entry, &task_cluster->settings_pull);
ParserCreateQuery parser_create_query;
query_create_pull_table = parseQuery(parser_create_query, create_query_pull_str);
/// Create local Distributed tables:
/// a table fetching data from current shard and a table inserting data to the whole destination cluster
DatabaseAndTableName table_shard(working_database_name, ".read_shard." + task_table.table_id);
DatabaseAndTableName table_split(working_database_name, ".split." + task_table.table_id);
/// Create special cluster with single shard
String shard_read_cluster_name = ".read_shard." + task_table.cluster_pull_name;
ClusterPtr cluster_pull_current_shard = task_table.cluster_pull->getClusterWithSingleShard(task_shard.indexInCluster());
context.setCluster(shard_read_cluster_name, cluster_pull_current_shard);
auto storage_shard_ast = createASTStorageDistributed(shard_read_cluster_name, task_table.table_pull.first, task_table.table_pull.second);
const auto & storage_split_ast = task_table.engine_split_ast;
auto create_query_ast = removeAliasColumnsFromCreateQuery(query_create_pull_table);
auto create_table_pull_ast = rewriteCreateQueryStorage(create_query_ast, table_shard, storage_shard_ast);
auto create_table_split_ast = rewriteCreateQueryStorage(create_query_ast, table_split, storage_split_ast);
//LOG_DEBUG(log, "Create shard reading table. Query: " << queryToString(create_table_pull_ast));
//LOG_DEBUG(log, "Create split table. Query: " << queryToString(create_table_split_ast));
/// We need to update table definitions for each partition, it could be changed after ALTER
/// Check that destination partition is empty if we are first worker
/// NOTE: this check is incorrect if pull and push tables have different partition key!
ASTPtr query_select_ast = get_select_query(table_split, "count()");
ASTPtr query_select_ast = get_select_query(task_shard.table_split_shard, "count()");
UInt64 count;
Context local_context = context;
@ -1292,9 +1375,8 @@ protected:
local_context.getSettingsRef().skip_unavailable_shards = true;
InterpreterSelectQuery interperter(query_select_ast, local_context);
BlockIO io = interperter.execute();
Block block = getBlockWithAllStreamData(;
Block block = getBlockWithAllStreamData(interperter.execute().in);
count = (block) ? block.safeGetByPosition(0).column->getUInt(0) : 0;
@ -1337,15 +1419,15 @@ protected:
/// Try create table (if not exists) on each shard
auto create_query_push_ast = rewriteCreateQueryStorage(query_create_pull_table, task_table.table_push, task_table.engine_push_ast);
auto create_query_push_ast = rewriteCreateQueryStorage(task_shard.current_pull_table_create_query, task_table.table_push, task_table.engine_push_ast);
typeid_cast<ASTCreateQuery &>(*create_query_push_ast).if_not_exists = true;
String query = queryToString(create_query_push_ast);
LOG_DEBUG(log, "Create remote push tables. Query: " << query);
LOG_DEBUG(log, "Create destination tables. Query: " << query);
size_t shards = executeQueryOnCluster(task_table.cluster_push, query, create_query_push_ast, &task_cluster->settings_push,
LOG_DEBUG(log, "Remote push tables have been created on " << shards << " shards of "
<< task_table.cluster_push->getShardCount());
LOG_DEBUG(log, "Destination tables " << getDatabaseDotTable(task_table.table_push) << " have been created on " << shards
<< " shards of " << task_table.cluster_push->getShardCount());
/// Do the copying
@ -1359,14 +1441,14 @@ protected:
// Select all fields
ASTPtr query_select_ast = get_select_query(table_shard, "*", inject_fault ? "1" : "");
ASTPtr query_select_ast = get_select_query(task_shard.table_read_shard, "*", inject_fault ? "1" : "");
LOG_DEBUG(log, "Executing SELECT query: " << queryToString(query_select_ast));
ASTPtr query_insert_ast;
String query;
query += "INSERT INTO " + getDatabaseDotTable(table_split) + " VALUES ";
query += "INSERT INTO " + getDatabaseDotTable(task_shard.table_split_shard) + " VALUES ";
ParserQuery p_query( + query.size());
query_insert_ast = parseQuery(p_query, query);
@ -1521,35 +1603,90 @@ protected:
return typeid_cast<const ColumnString &>(*block.safeGetByPosition(0).column).getDataAt(0).toString();
Strings getRemotePartitions(const DatabaseAndTableName & table, Connection & connection, const Settings * settings = nullptr)
ASTPtr getCreateTableForPullShard(TaskShard & task_shard)
Block block;
/// Fetch and parse (possibly) new definition
auto connection_entry =>get(&task_cluster->settings_pull);
String create_query_pull_str = getRemoteCreateTable(task_shard.task_table.table_pull, *connection_entry,
ParserCreateQuery parser_create_query;
return parseQuery(parser_create_query, create_query_pull_str);
void createShardInternalTables(TaskShard & task_shard)
TaskTable & task_table = task_shard.task_table;
/// We need to update table definitions for each part, it could be changed after ALTER
task_shard.current_pull_table_create_query = getCreateTableForPullShard(task_shard);
/// Create local Distributed tables:
/// a table fetching data from current shard and a table inserting data to the whole destination cluster
String read_shard_prefix = ".read_shard_" + toString(task_shard.indexInCluster()) + ".";
String split_shard_prefix = ".split.";
task_shard.table_read_shard = DatabaseAndTableName(working_database_name, read_shard_prefix + task_table.table_id);
task_shard.table_split_shard = DatabaseAndTableName(working_database_name, split_shard_prefix + task_table.table_id);
/// Create special cluster with single shard
String shard_read_cluster_name = read_shard_prefix + task_table.cluster_pull_name;
ClusterPtr cluster_pull_current_shard = task_table.cluster_pull->getClusterWithSingleShard(task_shard.indexInCluster());
context.setCluster(shard_read_cluster_name, cluster_pull_current_shard);
auto storage_shard_ast = createASTStorageDistributed(shard_read_cluster_name, task_table.table_pull.first, task_table.table_pull.second);
const auto & storage_split_ast = task_table.engine_split_ast;
auto create_query_ast = removeAliasColumnsFromCreateQuery(task_shard.current_pull_table_create_query);
auto create_table_pull_ast = rewriteCreateQueryStorage(create_query_ast, task_shard.table_read_shard, storage_shard_ast);
auto create_table_split_ast = rewriteCreateQueryStorage(create_query_ast, task_shard.table_split_shard, storage_split_ast);
//LOG_DEBUG(log, "Create shard reading table. Query: " << queryToString(create_table_pull_ast));
//LOG_DEBUG(log, "Create split table. Query: " << queryToString(create_table_split_ast));
std::set<String> getShardPartitions(TaskShard & task_shard)
TaskTable & task_table = task_shard.task_table;
String query;
WriteBufferFromOwnString wb;
<< " database = " << DB::quote << table.first
<< " AND table = " << DB::quote << table.second;
block = getBlockWithAllStreamData(std::make_shared<RemoteBlockInputStream>(
connection, wb.str(), Block{{ ColumnString::create(), std::make_shared<DataTypeString>(), "partition" }}, context, settings));
wb << "SELECT DISTINCT " << queryToString(task_table.engine_push_partition_key_ast) << " AS partition FROM"
<< " " << getDatabaseDotTable(task_shard.table_read_shard) << " ORDER BY partition DESC";
query = wb.str();
Strings res;
LOG_DEBUG(log, "Computing destination partition set, executing query: " << query);
ParserQuery parser_query( + query.size());
ASTPtr query_ast = parseQuery(parser_query, query);
Context local_context = context;
InterpreterSelectQuery interp(query_ast, local_context);
Block block = getBlockWithAllStreamData(interp.execute().in);
std::set<String> res;
if (block)
auto & partition_col = typeid_cast<const ColumnString &>(*block.getByName("partition").column);
for (size_t i = 0; i < partition_col.size(); ++i)
if (!existsRemoteTable(table, connection))
ColumnWithTypeAndName & column = block.getByPosition(0);
task_shard.partition_key_column = column;
for (size_t i = 0; i < column.column->size(); ++i)
throw Exception("Table " + getDatabaseDotTable(table) + " is not exists on server "
+ connection.getDescription(), ErrorCodes::UNKNOWN_TABLE);
WriteBufferFromOwnString wb;
column.type->serializeTextQuoted(*column.column, i, wb);
LOG_DEBUG(log, "There are " << res.size() << " destination partitions in shard " << task_shard.getDescription());
return res;
@ -1610,7 +1747,7 @@ protected:
Settings current_settings = settings ? *settings : task_cluster->settings_common;
current_settings.max_parallel_replicas = num_remote_replicas ? num_remote_replicas : 1;
std::vector<IConnectionPool::Entry> connections = shard.pool->getMany(&current_settings, pool_mode);
auto connections = shard.pool->getMany(&current_settings, pool_mode);
for (auto & connection : connections)
@ -1619,7 +1756,8 @@ protected:
RemoteBlockInputStream stream(*connection, query, context, &current_settings);
/// CREATE TABLE and DROP PARTITION return empty block
RemoteBlockInputStream stream(*connection, query, Block(), context, &current_settings);
NullBlockOutputStream output;
copyData(stream, output);
@ -1780,7 +1918,7 @@ void ClusterCopierApp::setupLogging()
Poco::AutoPtr<Poco::PatternFormatter> formatter(new Poco::PatternFormatter);
formatter->setProperty("pattern", "%L%Y-%m-%d %H:%M:%S.%i [ %I ] <%p> %s: %t");
formatter->setProperty("pattern", "%L%Y-%m-%d %H:%M:%S.%i <%p> %s: %t");
Poco::AutoPtr<Poco::FormattingChannel> formatting_channel(new Poco::FormattingChannel(formatter));
@ -1840,7 +1978,7 @@ int ClusterCopierApp::main(const std::vector<std::string> &)
catch (...)
std::cerr << DB::getCurrentExceptionMessage(true) << "\n";
tryLogCurrentException(&Poco::Logger::root(), __PRETTY_FUNCTION__);
auto code = getCurrentExceptionCode();
return (code) ? code : -1;

View File

@ -99,9 +99,8 @@
NOTE: If the first worker starts insert data and detects that destination partition is not empty then the partition will
be dropped and refilled, take it into account if you already have some data in destination tables. You could directly
specify partitions that should be copied in <enabled_partitions/>.
NOTE: Currently partition key of source and destination tables should be the same.
specify partitions that should be copied in <enabled_partitions/>, they should be in quoted format like partition column of table.
<engine>ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/hits2/{shard}/hits2', '{replica}', EventDate, (CounterID, EventDate), 8192)</engine>

View File

@ -30,7 +30,7 @@
<enabled_partitions> 0 1 2</enabled_partitions>
<!-- Engine of destination tables -->
<engine>ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster{cluster}/{shard}', '{replica}') PARTITION BY d % 3 ORDER BY d SETTINGS index_granularity = 16</engine>
<engine>ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster{cluster}/{shard}/hits', '{replica}') PARTITION BY d % 3 ORDER BY d SETTINGS index_granularity = 16</engine>
<!-- Which sarding key to use while copying -->
<sharding_key>d + 1</sharding_key>

View File

@ -0,0 +1,89 @@
<?xml version="1.0"?>
<!-- How many simualteneous workers are posssible -->
<!-- Common setting for pull and push operations -->
<!-- Tasks -->
<!-- Engine of destination tables -->
<engine>ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster{cluster}/{shard}/b', '{replica}') PARTITION BY toMonday(date) ORDER BY d</engine>
<!-- Which sarding key to use while copying -->
<sharding_key>d + 1</sharding_key>
<!-- Optional expression that filter copying data -->
<!-- <where_condition></where_condition> -->
<!-- Configuration of clusters -->
<!-- Died replica -->

View File

@ -69,27 +69,86 @@ def started_cluster():
def _test_copying(cmd_options):
instance = cluster.instances['s0_0_0']
class Task1:
ddl_check_query(instance, "DROP TABLE IF EXISTS hits ON CLUSTER cluster0")
ddl_check_query(instance, "DROP TABLE IF EXISTS hits ON CLUSTER cluster1")
ddl_check_query(instance, "DROP TABLE IF EXISTS hits_all ON CLUSTER cluster0")
ddl_check_query(instance, "DROP TABLE IF EXISTS hits_all ON CLUSTER cluster1")
def __init__(self, cluster):
self.cluster = cluster
self.copier_task_config = open(os.path.join(CURRENT_TEST_DIR, 'task0_description.xml'), 'r').read()
ddl_check_query(instance, "CREATE TABLE hits ON CLUSTER cluster0 (d UInt64, d1 UInt64 MATERIALIZED d+1) ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster_{cluster}/{shard}', '{replica}') PARTITION BY d % 3 ORDER BY d SETTINGS index_granularity = 16")
ddl_check_query(instance, "CREATE TABLE hits_all ON CLUSTER cluster0 (d UInt64) ENGINE=Distributed(cluster0, default, hits, d)")
ddl_check_query(instance, "CREATE TABLE hits_all ON CLUSTER cluster1 (d UInt64) ENGINE=Distributed(cluster1, default, hits, d + 1)")
instance.query("INSERT INTO hits_all SELECT * FROM system.numbers LIMIT 1002")
def start(self):
instance = cluster.instances['s0_0_0']
for cluster_num in ["0", "1"]:
ddl_check_query(instance, "DROP DATABASE IF EXISTS default ON CLUSTER cluster{}".format(cluster_num))
ddl_check_query(instance, "CREATE DATABASE IF NOT EXISTS default ON CLUSTER cluster{}".format(cluster_num))
ddl_check_query(instance, "CREATE TABLE hits ON CLUSTER cluster0 (d UInt64, d1 UInt64 MATERIALIZED d+1) ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster_{cluster}/{shard}/hits', '{replica}') PARTITION BY d % 3 ORDER BY d SETTINGS index_granularity = 16")
ddl_check_query(instance, "CREATE TABLE hits_all ON CLUSTER cluster0 (d UInt64) ENGINE=Distributed(cluster0, default, hits, d)")
ddl_check_query(instance, "CREATE TABLE hits_all ON CLUSTER cluster1 (d UInt64) ENGINE=Distributed(cluster1, default, hits, d + 1)")
instance.query("INSERT INTO hits_all SELECT * FROM system.numbers LIMIT 1002", settings={"insert_distributed_sync": 1})
def check(self):
assert TSV(self.cluster.instances['s0_0_0'].query("SELECT count() FROM hits_all")) == TSV("1002\n")
assert TSV(self.cluster.instances['s1_0_0'].query("SELECT count() FROM hits_all")) == TSV("1002\n")
assert TSV(self.cluster.instances['s1_0_0'].query("SELECT DISTINCT d % 2 FROM hits")) == TSV("1\n")
assert TSV(self.cluster.instances['s1_1_0'].query("SELECT DISTINCT d % 2 FROM hits")) == TSV("0\n")
instance = self.cluster.instances['s0_0_0']
ddl_check_query(instance, "DROP TABLE hits_all ON CLUSTER cluster0")
ddl_check_query(instance, "DROP TABLE hits_all ON CLUSTER cluster1")
ddl_check_query(instance, "DROP TABLE hits ON CLUSTER cluster0")
ddl_check_query(instance, "DROP TABLE hits ON CLUSTER cluster1")
class Task2:
def __init__(self, cluster):
self.cluster = cluster
self.copier_task_config = open(os.path.join(CURRENT_TEST_DIR, 'task_month_to_week_description.xml'), 'r').read()
def start(self):
instance = cluster.instances['s0_0_0']
for cluster_num in ["0", "1"]:
ddl_check_query(instance, "DROP DATABASE IF EXISTS default ON CLUSTER cluster{}".format(cluster_num))
ddl_check_query(instance, "CREATE DATABASE IF NOT EXISTS default ON CLUSTER cluster{}".format(cluster_num))
ddl_check_query(instance, "CREATE TABLE a ON CLUSTER cluster0 (date Date, d UInt64, d1 UInt64 ALIAS d+1) ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster_{cluster}/{shard}/a', '{replica}', date, intHash64(d), (date, intHash64(d)), 8192)")
ddl_check_query(instance, "CREATE TABLE a_all ON CLUSTER cluster0 (date Date, d UInt64) ENGINE=Distributed(cluster0, default, a, d)")
instance.query("INSERT INTO a_all SELECT toDate(17581 + number) AS date, number AS d FROM system.numbers LIMIT 85", settings={"insert_distributed_sync": 1})
def check(self):
assert TSV(self.cluster.instances['s0_0_0'].query("SELECT count() FROM cluster(cluster0, default, a)")) == TSV("85\n")
assert TSV(self.cluster.instances['s1_0_0'].query("SELECT count(), uniqExact(date) FROM cluster(cluster1, default, b)")) == TSV("85\t85\n")
assert TSV(self.cluster.instances['s1_0_0'].query("SELECT DISTINCT d % 2 FROM b")) == TSV("1\n")
assert TSV(self.cluster.instances['s1_1_0'].query("SELECT DISTINCT d % 2 FROM b")) == TSV("0\n")
assert TSV(self.cluster.instances['s1_0_0'].query("SELECT uniqExact(partition) IN (12, 13) FROM WHERE active AND database='default' AND table='b'")) == TSV("1\n")
assert TSV(self.cluster.instances['s1_1_0'].query("SELECT uniqExact(partition) IN (12, 13) FROM WHERE active AND database='default' AND table='b'")) == TSV("1\n")
instance = cluster.instances['s0_0_0']
ddl_check_query(instance, "DROP TABLE a ON CLUSTER cluster0")
ddl_check_query(instance, "DROP TABLE b ON CLUSTER cluster1")
def execute_task(task, cmd_options):
zk = cluster.get_kazoo_client('zoo1')
print "Use ZooKeeper server: {}:{}".format(zk.hosts[0][0], zk.hosts[0][1])
zk_task_path = "/clickhouse-copier/task_simple"
zk_task_path = task.zk_task_path
copier_task_config = open(os.path.join(CURRENT_TEST_DIR, 'task0_description.xml'), 'r').read()
zk.create(zk_task_path + "/description", copier_task_config)
zk.create(zk_task_path + "/description", task.copier_task_config)
# Run cluster-copier processes on each node
docker_api = docker.from_env().api
@ -97,7 +156,7 @@ def _test_copying(cmd_options):
cmd = ['/usr/bin/clickhouse', 'copier',
'--config', '/etc/clickhouse-server/config-preprocessed.xml',
'--task-path', '/clickhouse-copier/task_simple',
'--task-path', zk_task_path,
'--base-dir', '/var/log/clickhouse-server/copier']
cmd += cmd_options
@ -119,27 +178,26 @@ def _test_copying(cmd_options):
assert res['ExitCode'] == 0, "Instance: {} ({}). Info: {}".format(, instance.ip_address, repr(res))
assert TSV(cluster.instances['s0_0_0'].query("SELECT count() FROM hits_all")) == TSV("1002\n")
assert TSV(cluster.instances['s1_0_0'].query("SELECT count() FROM hits_all")) == TSV("1002\n")
assert TSV(cluster.instances['s1_0_0'].query("SELECT DISTINCT d % 2 FROM hits")) == TSV("1\n")
assert TSV(cluster.instances['s1_1_0'].query("SELECT DISTINCT d % 2 FROM hits")) == TSV("0\n")
zk.delete(zk_task_path, recursive=True)
ddl_check_query(instance, "DROP TABLE hits_all ON CLUSTER cluster0")
ddl_check_query(instance, "DROP TABLE hits_all ON CLUSTER cluster1")
ddl_check_query(instance, "DROP TABLE hits ON CLUSTER cluster0")
ddl_check_query(instance, "DROP TABLE hits ON CLUSTER cluster1")
zk.delete(zk_task_path, recursive=True)
def test_copy_simple(started_cluster):
def test_copy1_simple(started_cluster):
execute_task(Task1(started_cluster), [])
def test_copy_with_recovering(started_cluster):
_test_copying(['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY)])
def test_copy1_with_recovering(started_cluster):
execute_task(Task1(started_cluster), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY)])
def test_copy_month_to_week_partition(started_cluster):
execute_task(Task2(started_cluster), [])
def test_copy_month_to_week_partition(started_cluster):
execute_task(Task2(started_cluster), ['--copy-fault-probability', str(0.1)])
if __name__ == '__main__':
with contextmanager(started_cluster)() as cluster:
for name, instance in cluster.instances.items():