From 4382aac25d3156fd218df98913dff7f5c52506e6 Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Wed, 2 Jun 2021 15:46:37 +0300 Subject: [PATCH] support for ReplicatedTable creation without arguments --- programs/copier/ClusterCopier.cpp | 7 +- programs/copier/TaskTableAndShard.h | 20 ++++-- .../configs/conf.d/clusters_trivial.xml | 21 ++++++ .../task_trivial_without_arguments.xml | 64 +++++++++++++++++++ .../test_cluster_copier/test_trivial.py | 43 ++++++++++++- 5 files changed, 140 insertions(+), 15 deletions(-) create mode 100644 tests/integration/test_cluster_copier/configs/conf.d/clusters_trivial.xml create mode 100644 tests/integration/test_cluster_copier/task_trivial_without_arguments.xml diff --git a/programs/copier/ClusterCopier.cpp b/programs/copier/ClusterCopier.cpp index 55cb1696ae8..fdd1ad8f6a2 100644 --- a/programs/copier/ClusterCopier.cpp +++ b/programs/copier/ClusterCopier.cpp @@ -605,12 +605,9 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t Settings settings_push = task_cluster->settings_push; ClusterExecutionMode execution_mode = ClusterExecutionMode::ON_EACH_NODE; - UInt64 max_successful_executions_per_shard = 0; + if (settings_push.replication_alter_partitions_sync == 1) - { execution_mode = ClusterExecutionMode::ON_EACH_SHARD; - max_successful_executions_per_shard = 1; - } query_alter_ast_string += " ALTER TABLE " + getQuotedTable(original_table) + ((partition_name == "'all'") ? " ATTACH PARTITION ID " : " ATTACH PARTITION ") + partition_name + @@ -637,9 +634,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t task_table.cluster_push->getShardCount()); if (num_nodes != task_table.cluster_push->getShardCount()) - { return TaskStatus::Error; - } } else { diff --git a/programs/copier/TaskTableAndShard.h b/programs/copier/TaskTableAndShard.h index d15cb3fcd57..30b057440bb 100644 --- a/programs/copier/TaskTableAndShard.h +++ b/programs/copier/TaskTableAndShard.h @@ -89,7 +89,7 @@ struct TaskTable String engine_push_zk_path; bool is_replicated_table; - ASTPtr rewriteReplicatedCreateQueryToPlain(); + ASTPtr rewriteReplicatedCreateQueryToPlain() const; /* * A Distributed table definition used to split data @@ -363,7 +363,7 @@ inline void TaskTable::initShards(RandomEngine && random_engine) std::uniform_int_distribution get_urand(0, std::numeric_limits::max()); // Compute the priority - for (auto & shard_info : cluster_pull->getShardsInfo()) + for (const auto & shard_info : cluster_pull->getShardsInfo()) { TaskShardPtr task_shard = std::make_shared(*this, shard_info); const auto & replicas = cluster_pull->getShardsAddresses().at(task_shard->indexInCluster()); @@ -389,7 +389,7 @@ inline void TaskTable::initShards(RandomEngine && random_engine) local_shards.assign(all_shards.begin(), it_first_remote); } -inline ASTPtr TaskTable::rewriteReplicatedCreateQueryToPlain() +inline ASTPtr TaskTable::rewriteReplicatedCreateQueryToPlain() const { ASTPtr prev_engine_push_ast = engine_push_ast->clone(); @@ -403,9 +403,15 @@ inline ASTPtr TaskTable::rewriteReplicatedCreateQueryToPlain() { auto & replicated_table_arguments = new_engine_ast.arguments->children; - /// Delete first two arguments of Replicated...MergeTree() table. - replicated_table_arguments.erase(replicated_table_arguments.begin()); - replicated_table_arguments.erase(replicated_table_arguments.begin()); + + /// In some cases of Atomic database engine usage ReplicatedMergeTree tables + /// could be created without arguments. + if (!replicated_table_arguments.empty()) + { + /// Delete first two arguments of Replicated...MergeTree() table. + replicated_table_arguments.erase(replicated_table_arguments.begin()); + replicated_table_arguments.erase(replicated_table_arguments.begin()); + } } return new_storage_ast.clone(); @@ -420,7 +426,7 @@ inline String DB::TaskShard::getDescription() const inline String DB::TaskShard::getHostNameExample() const { - auto & replicas = task_table.cluster_pull->getShardsAddresses().at(indexInCluster()); + const auto & replicas = task_table.cluster_pull->getShardsAddresses().at(indexInCluster()); return replicas.at(0).readableString(); } diff --git a/tests/integration/test_cluster_copier/configs/conf.d/clusters_trivial.xml b/tests/integration/test_cluster_copier/configs/conf.d/clusters_trivial.xml new file mode 100644 index 00000000000..94abcd766f3 --- /dev/null +++ b/tests/integration/test_cluster_copier/configs/conf.d/clusters_trivial.xml @@ -0,0 +1,21 @@ + + + + + + + s0_0_0 + 9000 + + + + + + + s1_0_0 + 9000 + + + + + diff --git a/tests/integration/test_cluster_copier/task_trivial_without_arguments.xml b/tests/integration/test_cluster_copier/task_trivial_without_arguments.xml new file mode 100644 index 00000000000..d4c50877fa6 --- /dev/null +++ b/tests/integration/test_cluster_copier/task_trivial_without_arguments.xml @@ -0,0 +1,64 @@ + + + + 3 + + + + 1 + + + + + 0 + + + + + + + + + + source_trivial_cluster + default + trivial_without_arguments + + destination_trivial_cluster + default + trivial_without_arguments + + + ENGINE=ReplicatedMergeTree() PARTITION BY d % 5 ORDER BY (d, sipHash64(d)) SAMPLE BY sipHash64(d) SETTINGS index_granularity = 16 + + + d + 1 + + + d - d = 0 + + + + + + + + + s0_0_0 + 9000 + + + + + + + + + s1_0_0 + 9000 + + + + + + diff --git a/tests/integration/test_cluster_copier/test_trivial.py b/tests/integration/test_cluster_copier/test_trivial.py index 947c4bee5ca..29a87f71647 100644 --- a/tests/integration/test_cluster_copier/test_trivial.py +++ b/tests/integration/test_cluster_copier/test_trivial.py @@ -35,7 +35,7 @@ def started_cluster(): for replica_name in replicas: name = "s{}_{}_{}".format(cluster_name, shard_name, replica_name) cluster.add_instance(name, - main_configs=[], user_configs=[], + main_configs=["configs/conf.d/clusters_trivial.xml"], user_configs=[], macros={"cluster": cluster_name, "shard": shard_name, "replica": replica_name}, with_zookeeper=True) @@ -63,7 +63,7 @@ class TaskTrivial: node.query("DROP DATABASE IF EXISTS default") node.query("CREATE DATABASE IF NOT EXISTS default") - source.query("CREATE TABLE trivial (d UInt64, d1 UInt64 MATERIALIZED d+1) " + source.query("CREATE TABLE trivial (d UInt64, d1 UInt64 MATERIALIZED d+1)" "ENGINE=ReplicatedMergeTree('/clickhouse/tables/source_trivial_cluster/1/trivial', '1') " "PARTITION BY d % 5 ORDER BY (d, sipHash64(d)) SAMPLE BY sipHash64(d) SETTINGS index_granularity = 16") @@ -85,6 +85,42 @@ class TaskTrivial: node.query("DROP TABLE trivial") +class TaskReplicatedWithoutArguments: + def __init__(self, cluster): + self.cluster = cluster + self.zk_task_path = "/clickhouse-copier/task_trivial_without_arguments" + self.copier_task_config = open(os.path.join(CURRENT_TEST_DIR, 'task_trivial_without_arguments.xml'), 'r').read() + + def start(self): + source = cluster.instances['s0_0_0'] + destination = cluster.instances['s1_0_0'] + + for node in [source, destination]: + node.query("DROP DATABASE IF EXISTS default") + node.query("CREATE DATABASE IF NOT EXISTS default") + + source.query("CREATE TABLE trivial_without_arguments ON CLUSTER source_trivial_cluster (d UInt64, d1 UInt64 MATERIALIZED d+1) " + "ENGINE=ReplicatedMergeTree() " + "PARTITION BY d % 5 ORDER BY (d, sipHash64(d)) SAMPLE BY sipHash64(d) SETTINGS index_granularity = 16") + + source.query("INSERT INTO trivial_without_arguments SELECT * FROM system.numbers LIMIT 1002", + settings={"insert_distributed_sync": 1}) + + def check(self): + zk = cluster.get_kazoo_client('zoo1') + status_data, _ = zk.get(self.zk_task_path + "/status") + assert status_data == b'{"hits":{"all_partitions_count":5,"processed_partitions_count":5}}' + + source = cluster.instances['s0_0_0'] + destination = cluster.instances['s1_0_0'] + + assert TSV(source.query("SELECT count() FROM trivial_without_arguments")) == TSV("1002\n") + assert TSV(destination.query("SELECT count() FROM trivial_without_arguments")) == TSV("1002\n") + + for node in [source, destination]: + node.query("DROP TABLE trivial_without_arguments") + + def execute_task(task, cmd_options): task.start() @@ -151,3 +187,6 @@ def test_trivial_copy(started_cluster, use_sample_offset): else: execute_task(TaskTrivial(started_cluster, use_sample_offset), []) + +def test_trivial_without_arguments(started_cluster): + execute_task(TaskReplicatedWithoutArguments(started_cluster), [])