partition deletion added

This commit is contained in:
Nikita Mikhaylov 2020-03-18 21:35:58 +03:00
parent 23f027e6da
commit c33771105f
2 changed files with 33 additions and 7 deletions

View File

@ -1580,6 +1580,34 @@ void ClusterCopier::dropHelpingTables(const TaskTable & task_table)
}
}
void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskTable & task_table, const String & partition_name)
{
LOG_DEBUG(log, "Try drop partition partition from all helping tables.");
for (size_t current_piece_number = 0; current_piece_number < task_table.number_of_splits; ++current_piece_number)
{
DatabaseAndTableName original_table = task_table.table_push;
DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number));
String query = "ALTER TABLE " + getQuotedTable(helping_table) + " DROP PARTITION " + partition_name;
const ClusterPtr & cluster_push = task_table.cluster_push;
Settings settings_push = task_cluster->settings_push;
LOG_DEBUG(log, "Execute distributed DROP PARTITION: " << query);
/// We have to drop partition_piece on each replica
UInt64 num_nodes = executeQueryOnCluster(
cluster_push, query,
nullptr,
&settings_push,
PoolMode::GET_MANY,
ClusterExecutionMode::ON_EACH_NODE);
LOG_DEBUG(log, "DROP PARTITION query was successfully executed on " << toString(num_nodes) << " nodes.");
}
LOG_DEBUG(log, "All helping tables dropped partition " << partition_name);
}
String ClusterCopier::getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings * settings)
{
String query = "SHOW CREATE TABLE " + getQuotedTable(table);
@ -1655,8 +1683,6 @@ void ClusterCopier::createShardInternalTables(const ConnectionTimeouts & timeout
task_shard.list_of_split_tables_on_shard[piece_number],
storage_piece_split_ast);
std::cout << "anime" << queryToString(create_table_split_piece_ast) << std::endl;
dropAndCreateLocalTable(create_table_split_piece_ast);
}
}
@ -1819,11 +1845,6 @@ UInt64 ClusterCopier::executeQueryOnCluster(
UInt64 num_replicas = cluster->getShardsAddresses().at(shard_index).size();
for (size_t i = 0; i < num_replicas; ++i)
{
std::cout << "host_name " << cluster->getShardsAddresses().at(shard_index)[i].host_name
<< " port " << cluster->getShardsAddresses().at(shard_index)[i].port << std::endl;
}
origin_replicas_number += num_replicas;
UInt64 num_local_replicas = shard.getLocalNodeCount();
UInt64 num_remote_replicas = num_replicas - num_local_replicas;

View File

@ -149,6 +149,11 @@ protected:
void dropHelpingTables(const TaskTable & task_table);
/// Is used for usage less disk space.
/// After all pieces were successfully moved to original destination
/// table we can get rid of partition pieces (partitions in helping tables).
void dropParticularPartitionPieceFromAllHelpingTables(const TaskTable & task_table, const String & partition_name);
String getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings * settings = nullptr);
ASTPtr getCreateTableForPullShard(const ConnectionTimeouts & timeouts, TaskShard & task_shard);