Reading the code of ALTER MOVE PARTITION

This commit is contained in:
Alexey Milovidov 2020-02-21 19:57:40 +03:00
parent 93fddc7b46
commit 0dbd885679
2 changed files with 15 additions and 15 deletions

View File

@ -1042,6 +1042,7 @@ void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionComma
case PartitionCommand::MoveDestinationType::VOLUME: case PartitionCommand::MoveDestinationType::VOLUME:
movePartitionToVolume(command.partition, command.move_destination_name, command.part, context); movePartitionToVolume(command.partition, command.move_destination_name, command.part, context);
break; break;
case PartitionCommand::MoveDestinationType::TABLE: case PartitionCommand::MoveDestinationType::TABLE:
checkPartitionCanBeDropped(command.partition); checkPartitionCanBeDropped(command.partition);
String dest_database = command.to_database.empty() ? context.getCurrentDatabase() : command.to_database; String dest_database = command.to_database.empty() ? context.getCurrentDatabase() : command.to_database;
@ -1231,7 +1232,7 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
auto dest_table_storage = std::dynamic_pointer_cast<StorageMergeTree>(dest_table); auto dest_table_storage = std::dynamic_pointer_cast<StorageMergeTree>(dest_table);
if (!dest_table_storage) if (!dest_table_storage)
throw Exception("Table " + getStorageID().getNameForLogs() + " supports attachPartitionFrom only for MergeTree family of table engines." throw Exception("Table " + getStorageID().getNameForLogs() + " supports movePartitionToTable only for MergeTree family of table engines."
" Got " + dest_table->getName(), ErrorCodes::NOT_IMPLEMENTED); " Got " + dest_table->getName(), ErrorCodes::NOT_IMPLEMENTED);
if (dest_table_storage->getStoragePolicy() != this->getStoragePolicy()) if (dest_table_storage->getStoragePolicy() != this->getStoragePolicy())
throw Exception("Destination table " + dest_table_storage->getStorageID().getNameForLogs() + throw Exception("Destination table " + dest_table_storage->getStorageID().getNameForLogs() +
@ -1246,13 +1247,13 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
DataPartsVector src_parts = src_data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); DataPartsVector src_parts = src_data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
MutableDataPartsVector dst_parts; MutableDataPartsVector dst_parts;
static const String TMP_PREFIX = "tmp_replace_from_"; static const String TMP_PREFIX = "tmp_move_from_";
for (const DataPartPtr & src_part : src_parts) for (const DataPartPtr & src_part : src_parts)
{ {
if (!dest_table_storage->canReplacePartition(src_part)) if (!dest_table_storage->canReplacePartition(src_part))
throw Exception( throw Exception(
"Cannot replace partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table", "Cannot move partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table",
ErrorCodes::LOGICAL_ERROR); ErrorCodes::LOGICAL_ERROR);
/// This will generate unique name in scope of current server process. /// This will generate unique name in scope of current server process.
@ -1263,18 +1264,11 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
dst_parts.emplace_back(dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info)); dst_parts.emplace_back(dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info));
} }
/// ATTACH empty part set /// empty part set
if (dst_parts.empty()) if (dst_parts.empty())
return; return;
MergeTreePartInfo drop_range; /// Move new parts to the destination table. NOTE It doesn't look atomic.
drop_range.partition_id = partition_id;
drop_range.min_block = 0;
drop_range.max_block = increment.get(); // there will be a "hole" in block numbers
drop_range.level = std::numeric_limits<decltype(drop_range.level)>::max();
/// Atomically add new parts and remove old ones
try try
{ {
{ {

View File

@ -3551,9 +3551,11 @@ void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const Part
case PartitionCommand::MoveDestinationType::DISK: case PartitionCommand::MoveDestinationType::DISK:
movePartitionToDisk(command.partition, command.move_destination_name, command.part, query_context); movePartitionToDisk(command.partition, command.move_destination_name, command.part, query_context);
break; break;
case PartitionCommand::MoveDestinationType::VOLUME: case PartitionCommand::MoveDestinationType::VOLUME:
movePartitionToVolume(command.partition, command.move_destination_name, command.part, query_context); movePartitionToVolume(command.partition, command.move_destination_name, command.part, query_context);
break; break;
case PartitionCommand::MoveDestinationType::TABLE: case PartitionCommand::MoveDestinationType::TABLE:
checkPartitionCanBeDropped(command.partition); checkPartitionCanBeDropped(command.partition);
String dest_database = command.to_database.empty() ? query_context.getCurrentDatabase() : command.to_database; String dest_database = command.to_database.empty() ? query_context.getCurrentDatabase() : command.to_database;
@ -5118,7 +5120,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
auto dest_table_storage = std::dynamic_pointer_cast<StorageReplicatedMergeTree>(dest_table); auto dest_table_storage = std::dynamic_pointer_cast<StorageReplicatedMergeTree>(dest_table);
if (!dest_table_storage) if (!dest_table_storage)
throw Exception("Table " + getStorageID().getNameForLogs() + " supports attachPartitionFrom only for ReplicatedMergeTree family of table engines." throw Exception("Table " + getStorageID().getNameForLogs() + " supports movePartitionToTable only for ReplicatedMergeTree family of table engines."
" Got " + dest_table->getName(), ErrorCodes::NOT_IMPLEMENTED); " Got " + dest_table->getName(), ErrorCodes::NOT_IMPLEMENTED);
if (dest_table_storage->getStoragePolicy() != this->getStoragePolicy()) if (dest_table_storage->getStoragePolicy() != this->getStoragePolicy())
throw Exception("Destination table " + dest_table_storage->getStorageID().getNameForLogs() + throw Exception("Destination table " + dest_table_storage->getStorageID().getNameForLogs() +
@ -5140,9 +5142,11 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
LOG_DEBUG(log, "Cloning " << src_all_parts.size() << " parts"); LOG_DEBUG(log, "Cloning " << src_all_parts.size() << " parts");
static const String TMP_PREFIX = "tmp_replace_from_"; static const String TMP_PREFIX = "tmp_move_from_";
auto zookeeper = getZooKeeper(); auto zookeeper = getZooKeeper();
/// A range for log entry to remove parts from the source table (myself).
MergeTreePartInfo drop_range; MergeTreePartInfo drop_range;
drop_range.partition_id = partition_id; drop_range.partition_id = partition_id;
drop_range.max_block = allocateBlockNumber(partition_id, zookeeper)->getNumber(); drop_range.max_block = allocateBlockNumber(partition_id, zookeeper)->getNumber();
@ -5157,13 +5161,15 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
queue.disableMergesInBlockRange(drop_range_fake_part_name); queue.disableMergesInBlockRange(drop_range_fake_part_name);
} }
/// Clone parts into destination table.
for (size_t i = 0; i < src_all_parts.size(); ++i) for (size_t i = 0; i < src_all_parts.size(); ++i)
{ {
auto & src_part = src_all_parts[i]; auto & src_part = src_all_parts[i];
if (!dest_table_storage->canReplacePartition(src_part)) if (!dest_table_storage->canReplacePartition(src_part))
throw Exception( throw Exception(
"Cannot replace partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table", "Cannot move partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table",
ErrorCodes::LOGICAL_ERROR); ErrorCodes::LOGICAL_ERROR);
String hash_hex = src_part->checksums.getTotalChecksumHex(); String hash_hex = src_part->checksums.getTotalChecksumHex();