diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index c30aeae7584..dc940822abc 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -2753,6 +2753,96 @@ void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String throw Exception("Cannot move parts because moves are manually disabled", ErrorCodes::ABORTED); } +void MergeTreeData::fetchPartition(const ASTPtr & /*partition*/, const StorageMetadataPtr & /*metadata_snapshot*/, const String & /*from*/, const Context & /*query_context*/) +{ + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "FETCH PARTITION is not supported by storage {}", getName()); +} + +Pipe MergeTreeData::alterPartition( + const StorageMetadataPtr & metadata_snapshot, + const PartitionCommands & commands, + const Context & query_context) +{ + PartitionCommandsResultInfo result; + for (const PartitionCommand & command : commands) + { + PartitionCommandsResultInfo current_command_results; + switch (command.type) + { + case PartitionCommand::DROP_PARTITION: + if (command.part) + checkPartCanBeDropped(command.partition); + else + checkPartitionCanBeDropped(command.partition); + dropPartition(command.partition, command.detach, command.part, query_context); + break; + + case PartitionCommand::DROP_DETACHED_PARTITION: + dropDetached(command.partition, command.part, query_context); + break; + + case PartitionCommand::ATTACH_PARTITION: + current_command_results = attachPartition(command.partition, metadata_snapshot, command.part, query_context); + break; + case PartitionCommand::MOVE_PARTITION: + { + switch (*command.move_destination_type) + { + case PartitionCommand::MoveDestinationType::DISK: + movePartitionToDisk(command.partition, command.move_destination_name, command.part, query_context); + break; + + case PartitionCommand::MoveDestinationType::VOLUME: + movePartitionToVolume(command.partition, command.move_destination_name, command.part, query_context); + break; + + case PartitionCommand::MoveDestinationType::TABLE: + checkPartitionCanBeDropped(command.partition); + String dest_database = query_context.resolveDatabase(command.to_database); + auto dest_storage = DatabaseCatalog::instance().getTable({dest_database, command.to_table}, query_context); + movePartitionToTable(dest_storage, command.partition, query_context); + break; + } + } + break; + + case PartitionCommand::REPLACE_PARTITION: + { + checkPartitionCanBeDropped(command.partition); + String from_database = query_context.resolveDatabase(command.from_database); + auto from_storage = DatabaseCatalog::instance().getTable({from_database, command.from_table}, query_context); + replacePartitionFrom(from_storage, command.partition, command.replace, query_context); + } + break; + + case PartitionCommand::FETCH_PARTITION: + fetchPartition(command.partition, metadata_snapshot, command.from_zookeeper_path, query_context); + break; + + case PartitionCommand::FREEZE_PARTITION: + { + auto lock = lockForShare(query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout); + current_command_results = freezePartition(command.partition, metadata_snapshot, command.with_name, query_context, lock); + } + break; + + case PartitionCommand::FREEZE_ALL_PARTITIONS: + { + auto lock = lockForShare(query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout); + current_command_results = freezeAll(command.with_name, metadata_snapshot, query_context, lock); + } + break; + } + for (auto & command_result : current_command_results) + command_result.command_type = command.typeToString(); + result.insert(result.end(), current_command_results.begin(), current_command_results.end()); + } + + if (query_context.getSettingsRef().alter_partition_verbose_result) + return convertCommandsResultToSource(result); + + return {}; +} String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, const Context & context) const { diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 6bfb1613754..0e3e5aff4f1 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -563,6 +563,11 @@ public: void checkPartCanBeDropped(const ASTPtr & part); + Pipe alterPartition( + const StorageMetadataPtr & metadata_snapshot, + const PartitionCommands & commands, + const Context & query_context) override; + size_t getColumnCompressedSize(const std::string & name) const { auto lock = lockParts(); @@ -868,8 +873,17 @@ protected: using MatcherFn = std::function; PartitionCommandsResultInfo freezePartitionsByMatcher(MatcherFn matcher, const StorageMetadataPtr & metadata_snapshot, const String & with_name, const Context & context); + // Partition helpers bool canReplacePartition(const DataPartPtr & src_part) const; + virtual void dropPartition(const ASTPtr & partition, bool detach, bool drop_part, const Context & context) = 0; + virtual PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, const Context & context) = 0; + virtual void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context) = 0; + virtual void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context) = 0; + + /// Makes sense only for replicated tables + virtual void fetchPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, const String & from, const Context & query_context); + void writePartLog( PartLogElement::Type type, const ExecutionStatus & execution_status, diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 3fb6f0cb1e8..48220a106c2 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -1062,95 +1062,6 @@ bool StorageMergeTree::optimize( return true; } -Pipe StorageMergeTree::alterPartition( - const StorageMetadataPtr & metadata_snapshot, - const PartitionCommands & commands, - const Context & query_context) -{ - PartitionCommandsResultInfo result; - for (const PartitionCommand & command : commands) - { - PartitionCommandsResultInfo current_command_results; - switch (command.type) - { - case PartitionCommand::DROP_PARTITION: - if (command.part) - checkPartCanBeDropped(command.partition); - else - checkPartitionCanBeDropped(command.partition); - dropPartition(command.partition, command.detach, command.part, query_context); - break; - - case PartitionCommand::DROP_DETACHED_PARTITION: - dropDetached(command.partition, command.part, query_context); - break; - - case PartitionCommand::ATTACH_PARTITION: - current_command_results = attachPartition(command.partition, command.part, query_context); - break; - - case PartitionCommand::MOVE_PARTITION: - { - switch (*command.move_destination_type) - { - case PartitionCommand::MoveDestinationType::DISK: - movePartitionToDisk(command.partition, command.move_destination_name, command.part, query_context); - break; - - case PartitionCommand::MoveDestinationType::VOLUME: - movePartitionToVolume(command.partition, command.move_destination_name, command.part, query_context); - break; - - case PartitionCommand::MoveDestinationType::TABLE: - checkPartitionCanBeDropped(command.partition); - String dest_database = query_context.resolveDatabase(command.to_database); - auto dest_storage = DatabaseCatalog::instance().getTable({dest_database, command.to_table}, query_context); - movePartitionToTable(dest_storage, command.partition, query_context); - break; - } - - } - break; - - case PartitionCommand::REPLACE_PARTITION: - { - checkPartitionCanBeDropped(command.partition); - String from_database = query_context.resolveDatabase(command.from_database); - auto from_storage = DatabaseCatalog::instance().getTable({from_database, command.from_table}, query_context); - replacePartitionFrom(from_storage, command.partition, command.replace, query_context); - } - break; - - case PartitionCommand::FREEZE_PARTITION: - { - auto lock = lockForShare(query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout); - current_command_results = freezePartition(command.partition, metadata_snapshot, command.with_name, query_context, lock); - } - break; - - case PartitionCommand::FREEZE_ALL_PARTITIONS: - { - auto lock = lockForShare(query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout); - current_command_results = freezeAll(command.with_name, metadata_snapshot, query_context, lock); - } - break; - - default: - IStorage::alterPartition(metadata_snapshot, commands, query_context); // should throw an exception. - } - - for (auto & command_result : current_command_results) - command_result.command_type = command.typeToString(); - result.insert(result.end(), current_command_results.begin(), current_command_results.end()); - } - - if (query_context.getSettingsRef().alter_partition_verbose_result) - return convertCommandsResultToSource(result); - - return {}; -} - - ActionLock StorageMergeTree::stopMergesAndWait() { /// Asks to complete merges and does not allow them to start. @@ -1227,7 +1138,8 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, bool PartitionCommandsResultInfo StorageMergeTree::attachPartition( - const ASTPtr & partition, bool attach_part, const Context & context) + const ASTPtr & partition, const StorageMetadataPtr & /* metadata_snapshot */, + bool attach_part, const Context & context) { PartitionCommandsResultInfo results; PartsTemporaryRename renamed_parts(*this, "detached/"); diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index 6cddfe77fee..e3796cb9d10 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -72,11 +72,6 @@ public: bool deduplicate, const Context & context) override; - Pipe alterPartition( - const StorageMetadataPtr & /* metadata_snapshot */, - const PartitionCommands & commands, - const Context & context) override; - void mutate(const MutationCommands & commands, const Context & context) override; /// Return introspection information about currently processing or recently processed mutations. @@ -192,11 +187,11 @@ private: void clearOldMutations(bool truncate = false); // Partition helpers - void dropPartition(const ASTPtr & partition, bool detach, bool drop_part, const Context & context); - PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, bool part, const Context & context); + void dropPartition(const ASTPtr & partition, bool detach, bool drop_part, const Context & context) override; + PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, const Context & context) override; - void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context); - void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context); + void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context) override; + void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context) override; bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override; /// Update mutation entries after part mutation execution. May reset old /// errors if mutation was successful. Otherwise update last_failed* fields diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 93febf919c9..b93500000b5 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -4171,93 +4171,6 @@ void StorageReplicatedMergeTree::alter( } } -Pipe StorageReplicatedMergeTree::alterPartition( - const StorageMetadataPtr & metadata_snapshot, - const PartitionCommands & commands, - const Context & query_context) -{ - PartitionCommandsResultInfo result; - for (const PartitionCommand & command : commands) - { - PartitionCommandsResultInfo current_command_results; - switch (command.type) - { - case PartitionCommand::DROP_PARTITION: - if (command.part) - checkPartCanBeDropped(command.partition); - else - checkPartitionCanBeDropped(command.partition); - dropPartition(command.partition, command.detach, command.part, query_context); - break; - - case PartitionCommand::DROP_DETACHED_PARTITION: - dropDetached(command.partition, command.part, query_context); - break; - - case PartitionCommand::ATTACH_PARTITION: - current_command_results = attachPartition(command.partition, metadata_snapshot, command.part, query_context); - break; - case PartitionCommand::MOVE_PARTITION: - { - switch (*command.move_destination_type) - { - case PartitionCommand::MoveDestinationType::DISK: - movePartitionToDisk(command.partition, command.move_destination_name, command.part, query_context); - break; - - case PartitionCommand::MoveDestinationType::VOLUME: - movePartitionToVolume(command.partition, command.move_destination_name, command.part, query_context); - break; - - case PartitionCommand::MoveDestinationType::TABLE: - checkPartitionCanBeDropped(command.partition); - String dest_database = query_context.resolveDatabase(command.to_database); - auto dest_storage = DatabaseCatalog::instance().getTable({dest_database, command.to_table}, query_context); - movePartitionToTable(dest_storage, command.partition, query_context); - break; - } - } - break; - - case PartitionCommand::REPLACE_PARTITION: - { - checkPartitionCanBeDropped(command.partition); - String from_database = query_context.resolveDatabase(command.from_database); - auto from_storage = DatabaseCatalog::instance().getTable({from_database, command.from_table}, query_context); - replacePartitionFrom(from_storage, command.partition, command.replace, query_context); - } - break; - - case PartitionCommand::FETCH_PARTITION: - fetchPartition(command.partition, metadata_snapshot, command.from_zookeeper_path, query_context); - break; - - case PartitionCommand::FREEZE_PARTITION: - { - auto lock = lockForShare(query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout); - current_command_results = freezePartition(command.partition, metadata_snapshot, command.with_name, query_context, lock); - } - break; - - case PartitionCommand::FREEZE_ALL_PARTITIONS: - { - auto lock = lockForShare(query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout); - current_command_results = freezeAll(command.with_name, metadata_snapshot, query_context, lock); - } - break; - } - for (auto & command_result : current_command_results) - command_result.command_type = command.typeToString(); - result.insert(result.end(), current_command_results.begin(), current_command_results.end()); - } - - if (query_context.getSettingsRef().alter_partition_verbose_result) - return convertCommandsResultToSource(result); - - return {}; -} - - /// If new version returns ordinary name, else returns part name containing the first and last month of the month /// NOTE: use it in pair with getFakePartCoveringAllPartsInPartition(...) static String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info) diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index e0eaacf5e71..3e6b809cfdf 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -122,11 +122,6 @@ public: void alter(const AlterCommands & commands, const Context & query_context, TableLockHolder & table_lock_holder) override; - Pipe alterPartition( - const StorageMetadataPtr & metadata_snapshot, - const PartitionCommands & commands, - const Context & query_context) override; - void mutate(const MutationCommands & commands, const Context & context) override; void waitMutation(const String & znode_name, size_t mutations_sync) const; std::vector getMutationsStatus() const override; @@ -558,11 +553,11 @@ private: zkutil::ZooKeeper & zookeeper, String & partition_id, LogEntry & entry, bool detach); // Partition helpers - void dropPartition(const ASTPtr & partition, bool detach, bool drop_part, const Context & query_context); - PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, const Context & query_context); - void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & query_context); - void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & query_context); - void fetchPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, const String & from, const Context & query_context); + void dropPartition(const ASTPtr & partition, bool detach, bool drop_part, const Context & query_context) override; + PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, const Context & query_context) override; + void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & query_context) override; + void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & query_context) override; + void fetchPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, const String & from, const Context & query_context) override; /// Check granularity of already existing replicated table in zookeeper if it exists /// return true if it's fixed