From 649df01627c2e9e388b0d21a5119072080ed53da Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Tue, 3 Mar 2020 19:36:47 +0300 Subject: [PATCH] rewrite replicated tables to plain merge tree --- dbms/programs/copier/ClusterCopier.cpp | 2 +- dbms/programs/copier/TaskTableAndShard.h | 23 +++++++++++++++-------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/dbms/programs/copier/ClusterCopier.cpp b/dbms/programs/copier/ClusterCopier.cpp index 1a7ddce7e5e..cdf530e8437 100644 --- a/dbms/programs/copier/ClusterCopier.cpp +++ b/dbms/programs/copier/ClusterCopier.cpp @@ -1139,7 +1139,7 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl( auto new_engine_push_ast = task_table.engine_push_ast; if (task_table.isReplicatedTable()) { - new_engine_push_ast = task_table.rewriteParamsForReplicatedTableFor(current_piece_number); + new_engine_push_ast = task_table.rewriteReplicatedCreateQueryToPlain(); } auto create_query_push_ast = rewriteCreateQueryStorage( diff --git a/dbms/programs/copier/TaskTableAndShard.h b/dbms/programs/copier/TaskTableAndShard.h index b9f90dc1a77..c879152d6c9 100644 --- a/dbms/programs/copier/TaskTableAndShard.h +++ b/dbms/programs/copier/TaskTableAndShard.h @@ -39,7 +39,7 @@ struct TaskTable { String getCertainPartitionPieceTaskStatusPath(const String & partition_name, const size_t piece_number) const; - String getReplicatedEngineFirstArgumentForCurrentPiece(const size_t piece_number) const; + [[maybe_unused]] String getReplicatedEngineFirstArgumentForCurrentPiece(const size_t piece_number) const; bool isReplicatedTable() { return engine_push_zk_path != ""; } @@ -73,7 +73,7 @@ struct TaskTable { /// First argument of Replicated...MergeTree() String engine_push_zk_path; - ASTPtr rewriteParamsForReplicatedTableFor(const size_t current_piece_number) const; + ASTPtr rewriteReplicatedCreateQueryToPlain(); /* * A Distributed table definition used to split data @@ -358,19 +358,26 @@ inline void TaskTable::initShards(RandomEngine && random_engine) inline String TaskTable::getReplicatedEngineFirstArgumentForCurrentPiece(const size_t piece_number) const { assert (engine_push_zk_path != ""); - return engine_push_zk_path + "/" + toString(piece_number); + return engine_push_zk_path + "/piece_" + toString(piece_number); } -inline ASTPtr TaskTable::rewriteParamsForReplicatedTableFor(const size_t current_piece_number) const +inline ASTPtr TaskTable::rewriteReplicatedCreateQueryToPlain() { - const auto & new_engine_ast = engine_push_ast->clone()->as(); + ASTPtr prev_engine_push_ast = engine_push_ast->clone(); + + auto & new_storage_ast = prev_engine_push_ast->as(); + auto & new_engine_ast = new_storage_ast.engine->as(); auto & replicated_table_arguments = new_engine_ast.arguments->children; - auto & zk_table_path_ast = replicated_table_arguments[0]->as(); - zk_table_path_ast.value = getReplicatedEngineFirstArgumentForCurrentPiece(current_piece_number); + /// Delete first two arguments of Replicated...MergeTree() table. + replicated_table_arguments.erase(replicated_table_arguments.begin()); + replicated_table_arguments.erase(replicated_table_arguments.begin()); - return new_engine_ast.clone(); + /// Remove replicated from name + new_engine_ast.name = new_engine_ast.name.substr(10); + + return new_storage_ast.clone(); }