Cosmetic change for move partition feature

This commit is contained in:
Guillaume Tassery 2019-07-26 11:35:47 +02:00
parent f52fed5cea
commit 6fdedfc0e5
4 changed files with 25 additions and 28 deletions

View File

@ -2771,7 +2771,6 @@ MergeTreeData & MergeTreeData::checkStructureAndGetMergeTreeData(IStorage * sour
return *src_data; return *src_data;
} }
MergeTreeData & MergeTreeData::checkStructureAndGetMergeTreeData(const StoragePtr & source_table) const MergeTreeData & MergeTreeData::checkStructureAndGetMergeTreeData(const StoragePtr & source_table) const
{ {
return checkStructureAndGetMergeTreeData(source_table.get()); return checkStructureAndGetMergeTreeData(source_table.get());

View File

@ -956,9 +956,9 @@ void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionComma
case PartitionCommand::MOVE_PARTITION: case PartitionCommand::MOVE_PARTITION:
{ {
checkPartitionCanBeDropped(command.partition); checkPartitionCanBeDropped(command.partition);
String from_database = command.from_database.empty() ? context.getCurrentDatabase() : command.from_database; String dest_database = command.from_database.empty() ? context.getCurrentDatabase() : command.from_database;
auto from_storage = context.getTable(from_database, command.from_table); auto dest_storage = context.getTable(dest_database, command.from_table);
movePartitionTo(from_storage, command.partition, context); movePartitionTo(dest_storage, command.partition, context);
} }
break; break;
@ -1146,15 +1146,19 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
} }
} }
void StorageMergeTree::movePartitionTo(const StoragePtr & source_table, const ASTPtr & partition, const Context & context) void StorageMergeTree::movePartitionTo(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context)
{ {
auto source_table_storage = (StorageMergeTree *)(source_table.get()); auto dest_table_storage = std::dynamic_pointer_cast<StorageMergeTree>(dest_table);
if (!dest_table_storage)
throw Exception("Table " + this->getTableName() + " supports attachPartitionFrom only for MergeTree family of table engines."
" Got " + dest_table->getName(), ErrorCodes::NOT_IMPLEMENTED);
auto lock1 = lockStructureForShare(false, context.getCurrentQueryId()); auto lock1 = lockStructureForShare(false, context.getCurrentQueryId());
auto lock2 = source_table->lockStructureForShare(false, context.getCurrentQueryId()); auto lock2 = dest_table->lockStructureForShare(false, context.getCurrentQueryId());
Stopwatch watch; Stopwatch watch;
MergeTreeData & src_data = *dynamic_cast<MergeTreeData *>(this); MergeTreeData & src_data = dest_table_storage->checkStructureAndGetMergeTreeData(this);
String partition_id = getPartitionIDFromQuery(partition, context); String partition_id = getPartitionIDFromQuery(partition, context);
DataPartsVector src_parts = src_data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); DataPartsVector src_parts = src_data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
@ -1164,7 +1168,7 @@ void StorageMergeTree::movePartitionTo(const StoragePtr & source_table, const AS
for (const DataPartPtr & src_part : src_parts) for (const DataPartPtr & src_part : src_parts)
{ {
if (!source_table_storage->canReplacePartition(src_part)) if (!dest_table_storage->canReplacePartition(src_part))
throw Exception( throw Exception(
"Cannot replace partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table", "Cannot replace partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table",
ErrorCodes::LOGICAL_ERROR); ErrorCodes::LOGICAL_ERROR);
@ -1182,6 +1186,7 @@ void StorageMergeTree::movePartitionTo(const StoragePtr & source_table, const AS
return; return;
MergeTreePartInfo drop_range; MergeTreePartInfo drop_range;
drop_range.partition_id = partition_id; drop_range.partition_id = partition_id;
drop_range.min_block = 0; drop_range.min_block = 0;
drop_range.max_block = increment.get(); // there will be a "hole" in block numbers drop_range.max_block = increment.get(); // there will be a "hole" in block numbers
@ -1193,18 +1198,18 @@ void StorageMergeTree::movePartitionTo(const StoragePtr & source_table, const AS
{ {
/// Here we use the transaction just like RAII since rare errors in renameTempPartAndReplace() are possible /// Here we use the transaction just like RAII since rare errors in renameTempPartAndReplace() are possible
/// and we should be able to rollback already added (Precomitted) parts /// and we should be able to rollback already added (Precomitted) parts
Transaction transaction(*source_table_storage); Transaction transaction(*dest_table_storage);
auto data_parts_lock = lockParts(); auto data_parts_lock = lockParts();
/// Populate transaction /// Populate transaction
for (MutableDataPartPtr & part : dst_parts) for (MutableDataPartPtr & part : dst_parts)
source_table_storage->renameTempPartAndReplace(part, &increment, &transaction, data_parts_lock); dest_table_storage->renameTempPartAndReplace(part, &increment, &transaction, data_parts_lock);
transaction.commit(&data_parts_lock); transaction.commit(&data_parts_lock);
/// If it is REPLACE (not ATTACH), remove all parts which max_block_number less then min_block_number of the first new block /// If it is REPLACE (not ATTACH), remove all parts which max_block_number less then min_block_number of the first new block
/*source_table_storage->*/removePartsInRangeFromWorkingSet(drop_range, true, false, data_parts_lock); removePartsInRangeFromWorkingSet(drop_range, true, false, data_parts_lock);
} }
PartLog::addNewParts(global_context, dst_parts, watch.elapsed()); PartLog::addNewParts(global_context, dst_parts, watch.elapsed());
@ -1214,10 +1219,8 @@ void StorageMergeTree::movePartitionTo(const StoragePtr & source_table, const AS
PartLog::addNewParts(global_context, dst_parts, watch.elapsed(), ExecutionStatus::fromCurrentException()); PartLog::addNewParts(global_context, dst_parts, watch.elapsed(), ExecutionStatus::fromCurrentException());
throw; throw;
} }
} }
ActionLock StorageMergeTree::getActionLock(StorageActionBlockType action_type) ActionLock StorageMergeTree::getActionLock(StorageActionBlockType action_type)
{ {
if (action_type == ActionLocks::PartsMerge) if (action_type == ActionLocks::PartsMerge)
@ -1282,7 +1285,3 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, const Context & c
} }
} }
#include "StorageMergeTree.h"
#include <Databases/IDatabase.h>
#include <Common/escapeForFileName.h>

View File

@ -3368,9 +3368,9 @@ void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const Part
case PartitionCommand::MOVE_PARTITION: case PartitionCommand::MOVE_PARTITION:
{ {
checkPartitionCanBeDropped(command.partition); checkPartitionCanBeDropped(command.partition);
String from_database = command.from_database.empty() ? query_context.getCurrentDatabase() : command.from_database; String dest_database = command.from_database.empty() ? query_context.getCurrentDatabase() : command.from_database;
auto from_storage = query_context.getTable(from_database, command.from_table); auto dest_storage = query_context.getTable(dest_database, command.from_table);
movePartitionTo(from_storage, command.partition, command.replace, query_context); movePartitionTo(dest_storage, command.partition, query_context);
} }
break; break;
@ -4917,8 +4917,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
waitForAllReplicasToProcessLogEntry(entry); waitForAllReplicasToProcessLogEntry(entry);
} }
void StorageReplicatedMergeTree::movePartitionTo(const StoragePtr & /*source_table*/, const ASTPtr & /*partition*/, bool /*replace*/, void StorageReplicatedMergeTree::movePartitionTo(const StoragePtr & /*source_table*/, const ASTPtr & /*partition*/, const Context & /*context*/)
const Context & /*context*/)
{ {
// TODO: Implement // TODO: Implement
} }

View File

@ -503,7 +503,7 @@ private:
void dropPartition(const ASTPtr & query, const ASTPtr & partition, bool detach, const Context & query_context); void dropPartition(const ASTPtr & query, const ASTPtr & partition, bool detach, const Context & query_context);
void attachPartition(const ASTPtr & partition, bool part, const Context & query_context); void attachPartition(const ASTPtr & partition, bool part, const Context & query_context);
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & query_context); void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & query_context);
void movePartitionTo(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & query_context); void movePartitionTo(const StoragePtr & source_table, const ASTPtr & partition, const Context & query_context);
void fetchPartition(const ASTPtr & partition, const String & from, const Context & query_context); void fetchPartition(const ASTPtr & partition, const String & from, const Context & query_context);
protected: protected: