support for ReplicatedTable creation without arguments

This commit is contained in:
Nikita Mikhaylov 2021-06-02 15:46:37 +03:00
parent 312bb96eeb
commit 4382aac25d
5 changed files with 140 additions and 15 deletions

View File

@ -605,12 +605,9 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
Settings settings_push = task_cluster->settings_push;
ClusterExecutionMode execution_mode = ClusterExecutionMode::ON_EACH_NODE;
UInt64 max_successful_executions_per_shard = 0;
if (settings_push.replication_alter_partitions_sync == 1)
{
execution_mode = ClusterExecutionMode::ON_EACH_SHARD;
max_successful_executions_per_shard = 1;
}
query_alter_ast_string += " ALTER TABLE " + getQuotedTable(original_table) +
((partition_name == "'all'") ? " ATTACH PARTITION ID " : " ATTACH PARTITION ") + partition_name +
@ -637,9 +634,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
task_table.cluster_push->getShardCount());
if (num_nodes != task_table.cluster_push->getShardCount())
{
return TaskStatus::Error;
}
}
else
{

View File

@ -89,7 +89,7 @@ struct TaskTable
String engine_push_zk_path;
bool is_replicated_table;
ASTPtr rewriteReplicatedCreateQueryToPlain();
ASTPtr rewriteReplicatedCreateQueryToPlain() const;
/*
* A Distributed table definition used to split data
@ -363,7 +363,7 @@ inline void TaskTable::initShards(RandomEngine && random_engine)
std::uniform_int_distribution<UInt8> get_urand(0, std::numeric_limits<UInt8>::max());
// Compute the priority
for (auto & shard_info : cluster_pull->getShardsInfo())
for (const auto & shard_info : cluster_pull->getShardsInfo())
{
TaskShardPtr task_shard = std::make_shared<TaskShard>(*this, shard_info);
const auto & replicas = cluster_pull->getShardsAddresses().at(task_shard->indexInCluster());
@ -389,7 +389,7 @@ inline void TaskTable::initShards(RandomEngine && random_engine)
local_shards.assign(all_shards.begin(), it_first_remote);
}
inline ASTPtr TaskTable::rewriteReplicatedCreateQueryToPlain()
inline ASTPtr TaskTable::rewriteReplicatedCreateQueryToPlain() const
{
ASTPtr prev_engine_push_ast = engine_push_ast->clone();
@ -403,9 +403,15 @@ inline ASTPtr TaskTable::rewriteReplicatedCreateQueryToPlain()
{
auto & replicated_table_arguments = new_engine_ast.arguments->children;
/// Delete first two arguments of Replicated...MergeTree() table.
replicated_table_arguments.erase(replicated_table_arguments.begin());
replicated_table_arguments.erase(replicated_table_arguments.begin());
/// In some cases of Atomic database engine usage ReplicatedMergeTree tables
/// could be created without arguments.
if (!replicated_table_arguments.empty())
{
/// Delete first two arguments of Replicated...MergeTree() table.
replicated_table_arguments.erase(replicated_table_arguments.begin());
replicated_table_arguments.erase(replicated_table_arguments.begin());
}
}
return new_storage_ast.clone();
@ -420,7 +426,7 @@ inline String DB::TaskShard::getDescription() const
inline String DB::TaskShard::getHostNameExample() const
{
auto & replicas = task_table.cluster_pull->getShardsAddresses().at(indexInCluster());
const auto & replicas = task_table.cluster_pull->getShardsAddresses().at(indexInCluster());
return replicas.at(0).readableString();
}

View File

@ -0,0 +1,21 @@
<?xml version="1.0"?>
<yandex>
<remote_servers>
<source_trivial_cluster>
<shard>
<replica>
<host>s0_0_0</host>
<port>9000</port>
</replica>
</shard>
</source_trivial_cluster>
<destination_trivial_cluster>
<shard>
<replica>
<host>s1_0_0</host>
<port>9000</port>
</replica>
</shard>
</destination_trivial_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,64 @@
<?xml version="1.0"?>
<yandex>
<!-- How many simualteneous workers are posssible -->
<max_workers>3</max_workers>
<!-- Common setting for pull and push operations -->
<settings>
<connect_timeout>1</connect_timeout>
</settings>
<!-- Setting used to fetch data -->
<settings_pull>
<max_rows_in_distinct>0</max_rows_in_distinct>
</settings_pull>
<!-- Setting used to insert data -->
<settings_push>
</settings_push>
<!-- Tasks -->
<tables>
<hits>
<cluster_pull>source_trivial_cluster</cluster_pull>
<database_pull>default</database_pull>
<table_pull>trivial_without_arguments</table_pull>
<cluster_push>destination_trivial_cluster</cluster_push>
<database_push>default</database_push>
<table_push>trivial_without_arguments</table_push>
<!-- Engine of destination tables -->
<engine>ENGINE=ReplicatedMergeTree() PARTITION BY d % 5 ORDER BY (d, sipHash64(d)) SAMPLE BY sipHash64(d) SETTINGS index_granularity = 16</engine>
<!-- Which sarding key to use while copying -->
<sharding_key>d + 1</sharding_key>
<!-- Optional expression that filter copying data -->
<where_condition>d - d = 0</where_condition>
</hits>
</tables>
<!-- Configuration of clusters -->
<remote_servers>
<source_trivial_cluster>
<shard>
<replica>
<host>s0_0_0</host>
<port>9000</port>
</replica>
</shard>
</source_trivial_cluster>
<destination_trivial_cluster>
<shard>
<replica>
<host>s1_0_0</host>
<port>9000</port>
</replica>
</shard>
</destination_trivial_cluster>
</remote_servers>
</yandex>

View File

@ -35,7 +35,7 @@ def started_cluster():
for replica_name in replicas:
name = "s{}_{}_{}".format(cluster_name, shard_name, replica_name)
cluster.add_instance(name,
main_configs=[], user_configs=[],
main_configs=["configs/conf.d/clusters_trivial.xml"], user_configs=[],
macros={"cluster": cluster_name, "shard": shard_name, "replica": replica_name},
with_zookeeper=True)
@ -63,7 +63,7 @@ class TaskTrivial:
node.query("DROP DATABASE IF EXISTS default")
node.query("CREATE DATABASE IF NOT EXISTS default")
source.query("CREATE TABLE trivial (d UInt64, d1 UInt64 MATERIALIZED d+1) "
source.query("CREATE TABLE trivial (d UInt64, d1 UInt64 MATERIALIZED d+1)"
"ENGINE=ReplicatedMergeTree('/clickhouse/tables/source_trivial_cluster/1/trivial', '1') "
"PARTITION BY d % 5 ORDER BY (d, sipHash64(d)) SAMPLE BY sipHash64(d) SETTINGS index_granularity = 16")
@ -85,6 +85,42 @@ class TaskTrivial:
node.query("DROP TABLE trivial")
class TaskReplicatedWithoutArguments:
def __init__(self, cluster):
self.cluster = cluster
self.zk_task_path = "/clickhouse-copier/task_trivial_without_arguments"
self.copier_task_config = open(os.path.join(CURRENT_TEST_DIR, 'task_trivial_without_arguments.xml'), 'r').read()
def start(self):
source = cluster.instances['s0_0_0']
destination = cluster.instances['s1_0_0']
for node in [source, destination]:
node.query("DROP DATABASE IF EXISTS default")
node.query("CREATE DATABASE IF NOT EXISTS default")
source.query("CREATE TABLE trivial_without_arguments ON CLUSTER source_trivial_cluster (d UInt64, d1 UInt64 MATERIALIZED d+1) "
"ENGINE=ReplicatedMergeTree() "
"PARTITION BY d % 5 ORDER BY (d, sipHash64(d)) SAMPLE BY sipHash64(d) SETTINGS index_granularity = 16")
source.query("INSERT INTO trivial_without_arguments SELECT * FROM system.numbers LIMIT 1002",
settings={"insert_distributed_sync": 1})
def check(self):
zk = cluster.get_kazoo_client('zoo1')
status_data, _ = zk.get(self.zk_task_path + "/status")
assert status_data == b'{"hits":{"all_partitions_count":5,"processed_partitions_count":5}}'
source = cluster.instances['s0_0_0']
destination = cluster.instances['s1_0_0']
assert TSV(source.query("SELECT count() FROM trivial_without_arguments")) == TSV("1002\n")
assert TSV(destination.query("SELECT count() FROM trivial_without_arguments")) == TSV("1002\n")
for node in [source, destination]:
node.query("DROP TABLE trivial_without_arguments")
def execute_task(task, cmd_options):
task.start()
@ -151,3 +187,6 @@ def test_trivial_copy(started_cluster, use_sample_offset):
else:
execute_task(TaskTrivial(started_cluster, use_sample_offset), [])
def test_trivial_without_arguments(started_cluster):
execute_task(TaskReplicatedWithoutArguments(started_cluster), [])