#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace zkutil { class ZooKeeper; using ZooKeeperPtr = std::shared_ptr; } namespace DB { struct ColumnSize; class MergeTreeData; struct FutureMergedMutatedPart; class IReservation; using ReservationPtr = std::unique_ptr; class IVolume; using VolumePtr = std::shared_ptr; class IMergeTreeReader; class IMergeTreeDataPartWriter; class MarkCache; class UncompressedCache; namespace ErrorCodes { } /// Description of the data part. class IMergeTreeDataPart : public std::enable_shared_from_this { public: static constexpr auto DATA_FILE_EXTENSION = ".bin"; using Checksums = MergeTreeDataPartChecksums; using Checksum = MergeTreeDataPartChecksums::Checksum; using ValueSizeMap = std::map; using MergeTreeReaderPtr = std::unique_ptr; using MergeTreeWriterPtr = std::unique_ptr; using ColumnSizeByName = std::unordered_map; using NameToNumber = std::unordered_map; using Type = MergeTreeDataPartType; IMergeTreeDataPart( const MergeTreeData & storage_, const String & name_, const MergeTreePartInfo & info_, const VolumePtr & volume, const std::optional & relative_path, Type part_type_); IMergeTreeDataPart( MergeTreeData & storage_, const String & name_, const VolumePtr & volume, const std::optional & relative_path, Type part_type_); virtual MergeTreeReaderPtr getReader( const NamesAndTypesList & columns_, const StorageMetadataPtr & metadata_snapshot, const MarkRanges & mark_ranges, UncompressedCache * uncompressed_cache, MarkCache * mark_cache, const MergeTreeReaderSettings & reader_settings_, const ValueSizeMap & avg_value_size_hints_ = ValueSizeMap{}, const ReadBufferFromFileBase::ProfileCallback & profile_callback_ = ReadBufferFromFileBase::ProfileCallback{}) const = 0; virtual MergeTreeWriterPtr getWriter( const NamesAndTypesList & columns_list, const StorageMetadataPtr & metadata_snapshot, const std::vector & indices_to_recalc, const CompressionCodecPtr & default_codec_, const MergeTreeWriterSettings & writer_settings, const MergeTreeIndexGranularity & computed_index_granularity = {}) const = 0; virtual bool isStoredOnDisk() const = 0; virtual bool supportsVerticalMerge() const { return false; } /// NOTE: Returns zeros if column files are not found in checksums. /// Otherwise return information about column size on disk. ColumnSize getColumnSize(const String & column_name, const IDataType & /* type */) const; /// Return information about column size on disk for all columns in part ColumnSize getTotalColumnsSize() const { return total_columns_size; } virtual String getFileNameForColumn(const NameAndTypePair & column) const = 0; virtual ~IMergeTreeDataPart(); using ColumnToSize = std::map; /// Populates columns_to_size map (compressed size). void accumulateColumnSizes(ColumnToSize & /* column_to_size */) const; Type getType() const { return part_type; } String getTypeName() const { return getType().toString(); } void setColumns(const NamesAndTypesList & new_columns); const NamesAndTypesList & getColumns() const { return columns; } /// Throws an exception if part is not stored in on-disk format. void assertOnDisk() const; void remove(bool keep_s3 = false) const; /// Initialize columns (from columns.txt if exists, or create from column files if not). /// Load checksums from checksums.txt if exists. Load index if required. void loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency); String getMarksFileExtension() const { return index_granularity_info.marks_file_extension; } /// Generate the new name for this part according to `new_part_info` and min/max dates from the old name. /// This is useful when you want to change e.g. block numbers or the mutation version of the part. String getNewName(const MergeTreePartInfo & new_part_info) const; /// Returns column position in part structure or std::nullopt if it's missing in part. /// /// NOTE: Doesn't take column renames into account, if some column renames /// take place, you must take original name of column for this part from /// storage and pass it to this method. std::optional getColumnPosition(const String & column_name) const; /// Returns the name of a column with minimum compressed size (as returned by getColumnSize()). /// If no checksums are present returns the name of the first physically existing column. String getColumnNameWithMinimumCompressedSize(const StorageMetadataPtr & metadata_snapshot) const; bool contains(const IMergeTreeDataPart & other) const { return info.contains(other.info); } /// If the partition key includes date column (a common case), this function will return min and max values for that column. std::pair getMinMaxDate() const; /// otherwise, if the partition key includes dateTime column (also a common case), this function will return min and max values for that column. std::pair getMinMaxTime() const; bool isEmpty() const { return rows_count == 0; } /// Compute part block id for zero level part. Otherwise throws an exception. String getZeroLevelPartBlockID() const; const MergeTreeData & storage; String name; MergeTreePartInfo info; /// Part unique identifier. /// The intention is to use it for identifying cases where the same part is /// processed by multiple shards. UUID uuid = UUIDHelpers::Nil; VolumePtr volume; /// A directory path (relative to storage's path) where part data is actually stored /// Examples: 'detached/tmp_fetch_', 'tmp_', '' mutable String relative_path; MergeTreeIndexGranularityInfo index_granularity_info; size_t rows_count = 0; time_t modification_time = 0; /// When the part is removed from the working set. Changes once. mutable std::atomic remove_time { std::numeric_limits::max() }; /// If true, the destructor will delete the directory with the part. bool is_temp = false; /// If true it means that there are no ZooKeeper node for this part, so it should be deleted only from filesystem bool is_duplicate = false; /// Frozen by ALTER TABLE ... FREEZE ... It is used for information purposes in system.parts table. mutable std::atomic is_frozen {false}; /** * Part state is a stage of its lifetime. States are ordered and state of a part could be increased only. * Part state should be modified under data_parts mutex. * * Possible state transitions: * Temporary -> Precommitted: we are trying to commit a fetched, inserted or merged part to active set * Precommitted -> Outdated: we could not to add a part to active set and doing a rollback (for example it is duplicated part) * Precommitted -> Committed: we successfully committed a part to active dataset * Precommitted -> Outdated: a part was replaced by a covering part or DROP PARTITION * Outdated -> Deleting: a cleaner selected this part for deletion * Deleting -> Outdated: if an ZooKeeper error occurred during the deletion, we will retry deletion * Committed -> DeleteOnDestroy if part was moved to another disk */ enum class State { Temporary, /// the part is generating now, it is not in data_parts list PreCommitted, /// the part is in data_parts, but not used for SELECTs Committed, /// active data part, used by current and upcoming SELECTs Outdated, /// not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes Deleting, /// not active data part with identity refcounter, it is deleting right now by a cleaner DeleteOnDestroy, /// part was moved to another disk and should be deleted in own destructor }; using TTLInfo = MergeTreeDataPartTTLInfo; using TTLInfos = MergeTreeDataPartTTLInfos; TTLInfos ttl_infos; /// Current state of the part. If the part is in working set already, it should be accessed via data_parts mutex void setState(State new_state) const; State getState() const; /// Returns name of state static String stateToString(State state); String stateString() const; String getNameWithState() const { return name + " (state " + stateString() + ")"; } /// Returns true if state of part is one of affordable_states bool checkState(const std::initializer_list & affordable_states) const { for (auto affordable_state : affordable_states) { if (state == affordable_state) return true; } return false; } /// Throws an exception if state of the part is not in affordable_states void assertState(const std::initializer_list & affordable_states) const; /// Primary key (correspond to primary.idx file). /// Always loaded in RAM. Contains each index_granularity-th value of primary key tuple. /// Note that marks (also correspond to primary key) is not always in RAM, but cached. See MarkCache.h. using Index = Columns; Index index; MergeTreePartition partition; /// Amount of rows between marks /// As index always loaded into memory MergeTreeIndexGranularity index_granularity; /// Index that for each part stores min and max values of a set of columns. This allows quickly excluding /// parts based on conditions on these columns imposed by a query. /// Currently this index is built using only columns required by partition expression, but in principle it /// can be built using any set of columns. struct MinMaxIndex { /// A direct product of ranges for each key column. See Storages/MergeTree/KeyCondition.cpp for details. std::vector hyperrectangle; bool initialized = false; public: MinMaxIndex() = default; /// For month-based partitioning. MinMaxIndex(DayNum min_date, DayNum max_date) : hyperrectangle(1, Range(min_date, true, max_date, true)) , initialized(true) { } void load(const MergeTreeData & data, const DiskPtr & disk_, const String & part_path); void store(const MergeTreeData & data, const DiskPtr & disk_, const String & part_path, Checksums & checksums) const; void store(const Names & column_names, const DataTypes & data_types, const DiskPtr & disk_, const String & part_path, Checksums & checksums) const; void update(const Block & block, const Names & column_names); void merge(const MinMaxIndex & other); }; MinMaxIndex minmax_idx; Checksums checksums; /// Columns with values, that all have been zeroed by expired ttl NameSet expired_columns; CompressionCodecPtr default_codec; /// For data in RAM ('index') UInt64 getIndexSizeInBytes() const; UInt64 getIndexSizeInAllocatedBytes() const; UInt64 getMarksCount() const; UInt64 getBytesOnDisk() const { return bytes_on_disk; } void setBytesOnDisk(UInt64 bytes_on_disk_) { bytes_on_disk = bytes_on_disk_; } size_t getFileSizeOrZero(const String & file_name) const; /// Returns path to part dir relatively to disk mount point String getFullRelativePath() const; /// Returns full path to part dir String getFullPath() const; /// Moves a part to detached/ directory and adds prefix to its name void renameToDetached(const String & prefix) const; /// Makes checks and move part to new directory /// Changes only relative_dir_name, you need to update other metadata (name, is_temp) explicitly virtual void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) const; /// Makes clone of a part in detached/ directory via hard links virtual void makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const; /// Makes full clone of part in specified subdirectory (relative to storage data directory, e.g. "detached") on another disk void makeCloneOnDisk(const DiskPtr & disk, const String & directory_name) const; /// Checks that .bin and .mrk files exist. /// /// NOTE: Doesn't take column renames into account, if some column renames /// take place, you must take original name of column for this part from /// storage and pass it to this method. virtual bool hasColumnFiles(const NameAndTypePair & /* column */) const { return false; } /// Returns true if this part shall participate in merges according to /// settings of given storage policy. bool shallParticipateInMerges(const StoragePolicyPtr & storage_policy) const; /// Calculate the total size of the entire directory with all the files static UInt64 calculateTotalSizeOnDisk(const DiskPtr & disk_, const String & from); void calculateColumnsSizesOnDisk(); String getRelativePathForPrefix(const String & prefix) const; /// Return set of metadat file names without checksums. For example, /// columns.txt or checksums.txt itself. NameSet getFileNamesWithoutChecksums() const; /// File with compression codec name which was used to compress part columns /// by default. Some columns may have their own compression codecs, but /// default will be stored in this file. static inline constexpr auto DEFAULT_COMPRESSION_CODEC_FILE_NAME = "default_compression_codec.txt"; static inline constexpr auto DELETE_ON_DESTROY_MARKER_FILE_NAME = "delete-on-destroy.txt"; static inline constexpr auto UUID_FILE_NAME = "uuid.txt"; /// Checks that all TTLs (table min/max, column ttls, so on) for part /// calculated. Part without calculated TTL may exist if TTL was added after /// part creation (using alter query with materialize_ttl setting). bool checkAllTTLCalculated(const StorageMetadataPtr & metadata_snapshot) const; /// Returns serialization for column according to files in which column is written in part. SerializationPtr getSerializationForColumn(const NameAndTypePair & column) const; /// Return some uniq string for file /// Required for distinguish different copies of the same part on S3 String getUniqueId() const; protected: /// Total size of all columns, calculated once in calcuateColumnSizesOnDisk ColumnSize total_columns_size; /// Size for each column, calculated once in calcuateColumnSizesOnDisk ColumnSizeByName columns_sizes; /// Total size on disk, not only columns. May not contain size of /// checksums.txt and columns.txt. 0 - if not counted; UInt64 bytes_on_disk{0}; /// Columns description. Cannot be changed, after part initialization. NamesAndTypesList columns; const Type part_type; void removeIfNeeded(); virtual void checkConsistency(bool require_part_metadata) const; void checkConsistencyBase() const; /// Fill each_columns_size and total_size with sizes from columns files on /// disk using columns and checksums. virtual void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const = 0; String getRelativePathForDetachedPart(const String & prefix) const; private: /// In compact parts order of columns is necessary NameToNumber column_name_to_position; /// Reads part unique identifier (if exists) from uuid.txt void loadUUID(); /// Reads columns names and types from columns.txt void loadColumns(bool require); /// If checksums.txt exists, reads file's checksums (and sizes) from it void loadChecksums(bool require); /// Loads marks index granularity into memory virtual void loadIndexGranularity(); /// Loads index file. void loadIndex(); /// Load rows count for this part from disk (for the newer storage format version). /// For the older format version calculates rows count from the size of a column with a fixed size. void loadRowsCount(); /// Loads ttl infos in json format from file ttl.txt. If file doesn't exists assigns ttl infos with all zeros void loadTTLInfos(); void loadPartitionAndMinMaxIndex(); /// Load default compression codec from file default_compression_codec.txt /// if it not exists tries to deduce codec from compressed column without /// any specifial compression. void loadDefaultCompressionCodec(); /// Found column without specific compression and return codec /// for this column with default parameters. CompressionCodecPtr detectDefaultCompressionCodec() const; mutable State state{State::Temporary}; }; using MergeTreeDataPartState = IMergeTreeDataPart::State; using MergeTreeDataPartPtr = std::shared_ptr; using MergeTreeMutableDataPartPtr = std::shared_ptr; bool isCompactPart(const MergeTreeDataPartPtr & data_part); bool isWidePart(const MergeTreeDataPartPtr & data_part); bool isInMemoryPart(const MergeTreeDataPartPtr & data_part); }