#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { /** See the description of the data structure in MergeTreeData. */ class StorageMergeTree final : public ext::shared_ptr_helper, public MergeTreeData { friend struct ext::shared_ptr_helper; public: void startup() override; void shutdown() override; ~StorageMergeTree() override; std::string getName() const override { return merging_params.getModeName() + "MergeTree"; } bool supportsParallelInsert() const override { return true; } bool supportsIndexForIn() const override { return true; } Pipes read( const Names & column_names, const SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override; std::optional totalRows() const override; std::optional totalBytes() const override; BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override; /** Perform the next step in combining the parts. */ bool optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & context) override; void alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & context) override; void mutate(const MutationCommands & commands, const Context & context) override; /// Return introspection information about currently processing or recently processed mutations. std::vector getMutationsStatus() const override; CancellationCode killMutation(const String & mutation_id) override; void drop() override; void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override; void alter(const AlterCommands & commands, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override; void checkTableCanBeDropped() const override; void checkPartitionCanBeDropped(const ASTPtr & partition) override; ActionLock getActionLock(StorageActionBlockType action_type) override; CheckResults checkData(const ASTPtr & query, const Context & context) override; private: /// Mutex and condvar for synchronous mutations wait std::mutex mutation_wait_mutex; std::condition_variable mutation_wait_event; MergeTreeDataSelectExecutor reader; MergeTreeDataWriter writer; MergeTreeDataMergerMutator merger_mutator; /// For block numbers. SimpleIncrement increment{0}; /// For clearOldParts, clearOldTemporaryDirectories. AtomicStopwatch time_after_previous_cleanup; /// Mutex for parts currently processing in background /// merging (also with TTL), mutating or moving. mutable std::mutex currently_processing_in_background_mutex; /// Parts that currently participate in merge or mutation. /// This set have to be used with `currently_processing_in_background_mutex`. DataParts currently_merging_mutating_parts; std::map current_mutations_by_id; std::multimap current_mutations_by_version; std::atomic shutdown_called {false}; /// Task handler for merges, mutations and moves. BackgroundProcessingPool::TaskHandle merging_mutating_task_handle; BackgroundProcessingPool::TaskHandle moving_task_handle; void loadMutations(); /** Determines what parts should be merged and merges it. * If aggressive - when selects parts don't takes into account their ratio size and novelty (used for OPTIMIZE query). * Returns true if merge is finished successfully. */ bool merge(bool aggressive, const String & partition_id, bool final, bool deduplicate, String * out_disable_reason = nullptr); BackgroundProcessingPoolTaskResult movePartsTask(); /// Allocate block number for new mutation, write mutation to disk /// and into in-memory structures. Wake up merge-mutation task. Int64 startMutation(const MutationCommands & commands, String & mutation_file_name); /// Wait until mutation with version will finish mutation for all parts void waitForMutation(Int64 version, const String & file_name); /// Try and find a single part to mutate and mutate it. If some part was successfully mutated, return true. bool tryMutatePart(); BackgroundProcessingPoolTaskResult mergeMutateTask(); Int64 getCurrentMutationVersion( const DataPartPtr & part, std::lock_guard & /* currently_processing_in_background_mutex_lock */) const; void clearOldMutations(bool truncate = false); // Partition helpers void dropPartition(const ASTPtr & partition, bool detach, const Context & context); void attachPartition(const ASTPtr & partition, bool part, const Context & context); void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context); void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context); bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override; /// Just checks versions of each active data part bool isMutationDone(Int64 mutation_version) const; friend class MergeTreeBlockOutputStream; friend class MergeTreeData; friend struct CurrentlyMergingPartsTagger; protected: /** Attach the table with the appropriate name, along the appropriate path (with / at the end), * (correctness of names and paths are not checked) * consisting of the specified columns. * * See MergeTreeData constructor for comments on parameters. */ StorageMergeTree( const StorageID & table_id_, const String & relative_data_path_, const StorageInMemoryMetadata & metadata, bool attach, Context & context_, const String & date_column_name, const MergingParams & merging_params_, std::unique_ptr settings_, bool has_force_restore_data_flag); MutationCommands getFirtsAlterMutationCommandsForPart(const DataPartPtr & part) const override; }; }