rewrite replicated tables to plain merge tree

This commit is contained in:
Nikita Mikhaylov 2020-03-03 19:36:47 +03:00
parent d9765991a7
commit 649df01627
2 changed files with 16 additions and 9 deletions

View File

@ -1139,7 +1139,7 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl(
auto new_engine_push_ast = task_table.engine_push_ast;
if (task_table.isReplicatedTable())
{
new_engine_push_ast = task_table.rewriteParamsForReplicatedTableFor(current_piece_number);
new_engine_push_ast = task_table.rewriteReplicatedCreateQueryToPlain();
}
auto create_query_push_ast = rewriteCreateQueryStorage(

View File

@ -39,7 +39,7 @@ struct TaskTable {
String getCertainPartitionPieceTaskStatusPath(const String & partition_name, const size_t piece_number) const;
String getReplicatedEngineFirstArgumentForCurrentPiece(const size_t piece_number) const;
[[maybe_unused]] String getReplicatedEngineFirstArgumentForCurrentPiece(const size_t piece_number) const;
bool isReplicatedTable() { return engine_push_zk_path != ""; }
@ -73,7 +73,7 @@ struct TaskTable {
/// First argument of Replicated...MergeTree()
String engine_push_zk_path;
ASTPtr rewriteParamsForReplicatedTableFor(const size_t current_piece_number) const;
ASTPtr rewriteReplicatedCreateQueryToPlain();
/*
* A Distributed table definition used to split data
@ -358,19 +358,26 @@ inline void TaskTable::initShards(RandomEngine && random_engine)
inline String TaskTable::getReplicatedEngineFirstArgumentForCurrentPiece(const size_t piece_number) const
{
assert (engine_push_zk_path != "");
return engine_push_zk_path + "/" + toString(piece_number);
return engine_push_zk_path + "/piece_" + toString(piece_number);
}
inline ASTPtr TaskTable::rewriteParamsForReplicatedTableFor(const size_t current_piece_number) const
inline ASTPtr TaskTable::rewriteReplicatedCreateQueryToPlain()
{
const auto & new_engine_ast = engine_push_ast->clone()->as<ASTFunction &>();
ASTPtr prev_engine_push_ast = engine_push_ast->clone();
auto & new_storage_ast = prev_engine_push_ast->as<ASTStorage &>();
auto & new_engine_ast = new_storage_ast.engine->as<ASTFunction &>();
auto & replicated_table_arguments = new_engine_ast.arguments->children;
auto & zk_table_path_ast = replicated_table_arguments[0]->as<ASTLiteral &>();
zk_table_path_ast.value = getReplicatedEngineFirstArgumentForCurrentPiece(current_piece_number);
/// Delete first two arguments of Replicated...MergeTree() table.
replicated_table_arguments.erase(replicated_table_arguments.begin());
replicated_table_arguments.erase(replicated_table_arguments.begin());
return new_engine_ast.clone();
/// Remove replicated from name
new_engine_ast.name = new_engine_ast.name.substr(10);
return new_storage_ast.clone();
}