Merge pull request #16875 from CurtizJ/refactor-alter-partition

Avoid code duplication in alterPartition
This commit is contained in:
alesapin 2020-11-12 13:19:59 +03:00 committed by GitHub
commit 2cac5e5d97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 115 additions and 196 deletions

View File

@ -2753,6 +2753,96 @@ void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String
throw Exception("Cannot move parts because moves are manually disabled", ErrorCodes::ABORTED);
}
void MergeTreeData::fetchPartition(const ASTPtr & /*partition*/, const StorageMetadataPtr & /*metadata_snapshot*/, const String & /*from*/, const Context & /*query_context*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "FETCH PARTITION is not supported by storage {}", getName());
}
Pipe MergeTreeData::alterPartition(
const StorageMetadataPtr & metadata_snapshot,
const PartitionCommands & commands,
const Context & query_context)
{
PartitionCommandsResultInfo result;
for (const PartitionCommand & command : commands)
{
PartitionCommandsResultInfo current_command_results;
switch (command.type)
{
case PartitionCommand::DROP_PARTITION:
if (command.part)
checkPartCanBeDropped(command.partition);
else
checkPartitionCanBeDropped(command.partition);
dropPartition(command.partition, command.detach, command.part, query_context);
break;
case PartitionCommand::DROP_DETACHED_PARTITION:
dropDetached(command.partition, command.part, query_context);
break;
case PartitionCommand::ATTACH_PARTITION:
current_command_results = attachPartition(command.partition, metadata_snapshot, command.part, query_context);
break;
case PartitionCommand::MOVE_PARTITION:
{
switch (*command.move_destination_type)
{
case PartitionCommand::MoveDestinationType::DISK:
movePartitionToDisk(command.partition, command.move_destination_name, command.part, query_context);
break;
case PartitionCommand::MoveDestinationType::VOLUME:
movePartitionToVolume(command.partition, command.move_destination_name, command.part, query_context);
break;
case PartitionCommand::MoveDestinationType::TABLE:
checkPartitionCanBeDropped(command.partition);
String dest_database = query_context.resolveDatabase(command.to_database);
auto dest_storage = DatabaseCatalog::instance().getTable({dest_database, command.to_table}, query_context);
movePartitionToTable(dest_storage, command.partition, query_context);
break;
}
}
break;
case PartitionCommand::REPLACE_PARTITION:
{
checkPartitionCanBeDropped(command.partition);
String from_database = query_context.resolveDatabase(command.from_database);
auto from_storage = DatabaseCatalog::instance().getTable({from_database, command.from_table}, query_context);
replacePartitionFrom(from_storage, command.partition, command.replace, query_context);
}
break;
case PartitionCommand::FETCH_PARTITION:
fetchPartition(command.partition, metadata_snapshot, command.from_zookeeper_path, query_context);
break;
case PartitionCommand::FREEZE_PARTITION:
{
auto lock = lockForShare(query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout);
current_command_results = freezePartition(command.partition, metadata_snapshot, command.with_name, query_context, lock);
}
break;
case PartitionCommand::FREEZE_ALL_PARTITIONS:
{
auto lock = lockForShare(query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout);
current_command_results = freezeAll(command.with_name, metadata_snapshot, query_context, lock);
}
break;
}
for (auto & command_result : current_command_results)
command_result.command_type = command.typeToString();
result.insert(result.end(), current_command_results.begin(), current_command_results.end());
}
if (query_context.getSettingsRef().alter_partition_verbose_result)
return convertCommandsResultToSource(result);
return {};
}
String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, const Context & context) const
{

View File

@ -563,6 +563,11 @@ public:
void checkPartCanBeDropped(const ASTPtr & part);
Pipe alterPartition(
const StorageMetadataPtr & metadata_snapshot,
const PartitionCommands & commands,
const Context & query_context) override;
size_t getColumnCompressedSize(const std::string & name) const
{
auto lock = lockParts();
@ -868,8 +873,17 @@ protected:
using MatcherFn = std::function<bool(const DataPartPtr &)>;
PartitionCommandsResultInfo freezePartitionsByMatcher(MatcherFn matcher, const StorageMetadataPtr & metadata_snapshot, const String & with_name, const Context & context);
// Partition helpers
bool canReplacePartition(const DataPartPtr & src_part) const;
virtual void dropPartition(const ASTPtr & partition, bool detach, bool drop_part, const Context & context) = 0;
virtual PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, const Context & context) = 0;
virtual void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context) = 0;
virtual void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context) = 0;
/// Makes sense only for replicated tables
virtual void fetchPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, const String & from, const Context & query_context);
void writePartLog(
PartLogElement::Type type,
const ExecutionStatus & execution_status,

View File

@ -1062,95 +1062,6 @@ bool StorageMergeTree::optimize(
return true;
}
Pipe StorageMergeTree::alterPartition(
const StorageMetadataPtr & metadata_snapshot,
const PartitionCommands & commands,
const Context & query_context)
{
PartitionCommandsResultInfo result;
for (const PartitionCommand & command : commands)
{
PartitionCommandsResultInfo current_command_results;
switch (command.type)
{
case PartitionCommand::DROP_PARTITION:
if (command.part)
checkPartCanBeDropped(command.partition);
else
checkPartitionCanBeDropped(command.partition);
dropPartition(command.partition, command.detach, command.part, query_context);
break;
case PartitionCommand::DROP_DETACHED_PARTITION:
dropDetached(command.partition, command.part, query_context);
break;
case PartitionCommand::ATTACH_PARTITION:
current_command_results = attachPartition(command.partition, command.part, query_context);
break;
case PartitionCommand::MOVE_PARTITION:
{
switch (*command.move_destination_type)
{
case PartitionCommand::MoveDestinationType::DISK:
movePartitionToDisk(command.partition, command.move_destination_name, command.part, query_context);
break;
case PartitionCommand::MoveDestinationType::VOLUME:
movePartitionToVolume(command.partition, command.move_destination_name, command.part, query_context);
break;
case PartitionCommand::MoveDestinationType::TABLE:
checkPartitionCanBeDropped(command.partition);
String dest_database = query_context.resolveDatabase(command.to_database);
auto dest_storage = DatabaseCatalog::instance().getTable({dest_database, command.to_table}, query_context);
movePartitionToTable(dest_storage, command.partition, query_context);
break;
}
}
break;
case PartitionCommand::REPLACE_PARTITION:
{
checkPartitionCanBeDropped(command.partition);
String from_database = query_context.resolveDatabase(command.from_database);
auto from_storage = DatabaseCatalog::instance().getTable({from_database, command.from_table}, query_context);
replacePartitionFrom(from_storage, command.partition, command.replace, query_context);
}
break;
case PartitionCommand::FREEZE_PARTITION:
{
auto lock = lockForShare(query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout);
current_command_results = freezePartition(command.partition, metadata_snapshot, command.with_name, query_context, lock);
}
break;
case PartitionCommand::FREEZE_ALL_PARTITIONS:
{
auto lock = lockForShare(query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout);
current_command_results = freezeAll(command.with_name, metadata_snapshot, query_context, lock);
}
break;
default:
IStorage::alterPartition(metadata_snapshot, commands, query_context); // should throw an exception.
}
for (auto & command_result : current_command_results)
command_result.command_type = command.typeToString();
result.insert(result.end(), current_command_results.begin(), current_command_results.end());
}
if (query_context.getSettingsRef().alter_partition_verbose_result)
return convertCommandsResultToSource(result);
return {};
}
ActionLock StorageMergeTree::stopMergesAndWait()
{
/// Asks to complete merges and does not allow them to start.
@ -1227,7 +1138,8 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, bool
PartitionCommandsResultInfo StorageMergeTree::attachPartition(
const ASTPtr & partition, bool attach_part, const Context & context)
const ASTPtr & partition, const StorageMetadataPtr & /* metadata_snapshot */,
bool attach_part, const Context & context)
{
PartitionCommandsResultInfo results;
PartsTemporaryRename renamed_parts(*this, "detached/");

View File

@ -72,11 +72,6 @@ public:
bool deduplicate,
const Context & context) override;
Pipe alterPartition(
const StorageMetadataPtr & /* metadata_snapshot */,
const PartitionCommands & commands,
const Context & context) override;
void mutate(const MutationCommands & commands, const Context & context) override;
/// Return introspection information about currently processing or recently processed mutations.
@ -192,11 +187,11 @@ private:
void clearOldMutations(bool truncate = false);
// Partition helpers
void dropPartition(const ASTPtr & partition, bool detach, bool drop_part, const Context & context);
PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, bool part, const Context & context);
void dropPartition(const ASTPtr & partition, bool detach, bool drop_part, const Context & context) override;
PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, const Context & context) override;
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context);
void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context);
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context) override;
void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context) override;
bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override;
/// Update mutation entries after part mutation execution. May reset old
/// errors if mutation was successful. Otherwise update last_failed* fields

View File

@ -4171,93 +4171,6 @@ void StorageReplicatedMergeTree::alter(
}
}
Pipe StorageReplicatedMergeTree::alterPartition(
const StorageMetadataPtr & metadata_snapshot,
const PartitionCommands & commands,
const Context & query_context)
{
PartitionCommandsResultInfo result;
for (const PartitionCommand & command : commands)
{
PartitionCommandsResultInfo current_command_results;
switch (command.type)
{
case PartitionCommand::DROP_PARTITION:
if (command.part)
checkPartCanBeDropped(command.partition);
else
checkPartitionCanBeDropped(command.partition);
dropPartition(command.partition, command.detach, command.part, query_context);
break;
case PartitionCommand::DROP_DETACHED_PARTITION:
dropDetached(command.partition, command.part, query_context);
break;
case PartitionCommand::ATTACH_PARTITION:
current_command_results = attachPartition(command.partition, metadata_snapshot, command.part, query_context);
break;
case PartitionCommand::MOVE_PARTITION:
{
switch (*command.move_destination_type)
{
case PartitionCommand::MoveDestinationType::DISK:
movePartitionToDisk(command.partition, command.move_destination_name, command.part, query_context);
break;
case PartitionCommand::MoveDestinationType::VOLUME:
movePartitionToVolume(command.partition, command.move_destination_name, command.part, query_context);
break;
case PartitionCommand::MoveDestinationType::TABLE:
checkPartitionCanBeDropped(command.partition);
String dest_database = query_context.resolveDatabase(command.to_database);
auto dest_storage = DatabaseCatalog::instance().getTable({dest_database, command.to_table}, query_context);
movePartitionToTable(dest_storage, command.partition, query_context);
break;
}
}
break;
case PartitionCommand::REPLACE_PARTITION:
{
checkPartitionCanBeDropped(command.partition);
String from_database = query_context.resolveDatabase(command.from_database);
auto from_storage = DatabaseCatalog::instance().getTable({from_database, command.from_table}, query_context);
replacePartitionFrom(from_storage, command.partition, command.replace, query_context);
}
break;
case PartitionCommand::FETCH_PARTITION:
fetchPartition(command.partition, metadata_snapshot, command.from_zookeeper_path, query_context);
break;
case PartitionCommand::FREEZE_PARTITION:
{
auto lock = lockForShare(query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout);
current_command_results = freezePartition(command.partition, metadata_snapshot, command.with_name, query_context, lock);
}
break;
case PartitionCommand::FREEZE_ALL_PARTITIONS:
{
auto lock = lockForShare(query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout);
current_command_results = freezeAll(command.with_name, metadata_snapshot, query_context, lock);
}
break;
}
for (auto & command_result : current_command_results)
command_result.command_type = command.typeToString();
result.insert(result.end(), current_command_results.begin(), current_command_results.end());
}
if (query_context.getSettingsRef().alter_partition_verbose_result)
return convertCommandsResultToSource(result);
return {};
}
/// If new version returns ordinary name, else returns part name containing the first and last month of the month
/// NOTE: use it in pair with getFakePartCoveringAllPartsInPartition(...)
static String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info)

View File

@ -122,11 +122,6 @@ public:
void alter(const AlterCommands & commands, const Context & query_context, TableLockHolder & table_lock_holder) override;
Pipe alterPartition(
const StorageMetadataPtr & metadata_snapshot,
const PartitionCommands & commands,
const Context & query_context) override;
void mutate(const MutationCommands & commands, const Context & context) override;
void waitMutation(const String & znode_name, size_t mutations_sync) const;
std::vector<MergeTreeMutationStatus> getMutationsStatus() const override;
@ -558,11 +553,11 @@ private:
zkutil::ZooKeeper & zookeeper, String & partition_id, LogEntry & entry, bool detach);
// Partition helpers
void dropPartition(const ASTPtr & partition, bool detach, bool drop_part, const Context & query_context);
PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, const Context & query_context);
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & query_context);
void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & query_context);
void fetchPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, const String & from, const Context & query_context);
void dropPartition(const ASTPtr & partition, bool detach, bool drop_part, const Context & query_context) override;
PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, const Context & query_context) override;
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & query_context) override;
void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & query_context) override;
void fetchPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, const String & from, const Context & query_context) override;
/// Check granularity of already existing replicated table in zookeeper if it exists
/// return true if it's fixed