ClickHouse/src/Storages/MergeTree/MergeTreeData.h

934 lines
42 KiB
C++
Raw Normal View History

2014-03-09 17:36:01 +00:00
#pragma once
#include <Common/SimpleIncrement.h>
#include <Common/MultiVersion.h>
2019-05-03 02:00:57 +00:00
#include <Storages/IStorage.h>
2019-01-17 12:11:36 +00:00
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
2019-05-03 02:00:57 +00:00
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
2019-09-03 11:32:25 +00:00
#include <Storages/MergeTree/MergeList.h>
#include <Storages/DataDestinationType.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
2020-04-14 09:21:24 +00:00
#include <Processors/Merges/Algorithms/Graphite.h>
2019-10-10 16:30:30 +00:00
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
2019-01-20 18:43:49 +00:00
#include <Storages/IndicesDescription.h>
2019-09-05 13:12:29 +00:00
#include <Storages/MergeTree/MergeTreePartsMover.h>
2020-04-14 19:47:19 +00:00
#include <Storages/MergeTree/MergeTreeWriteAheadLog.h>
2019-09-03 11:32:25 +00:00
#include <Interpreters/PartLog.h>
#include <Disks/StoragePolicy.h>
#include <Interpreters/Aggregator.h>
2020-06-16 08:39:12 +00:00
#include <Storages/extractKeyExpressionList.h>
2020-07-13 17:27:52 +00:00
#include <Storages/PartitionCommands.h>
2014-04-02 13:45:39 +00:00
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/global_fun.hpp>
#include <boost/range/iterator_range_core.hpp>
2014-04-02 13:45:39 +00:00
2014-03-09 17:36:01 +00:00
namespace DB
{
2018-12-25 23:11:36 +00:00
class AlterCommands;
2019-09-05 13:12:29 +00:00
class MergeTreePartsMover;
2020-03-24 17:05:38 +00:00
class MutationCommands;
class Context;
2020-10-16 10:12:31 +00:00
struct JobAndPool;
2018-12-25 23:11:36 +00:00
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
2017-02-07 17:52:41 +00:00
/// Data structure for *MergeTree engines.
/// Merge tree is used for incremental sorting of data.
/// The table consists of several sorted parts.
/// During insertion new data is sorted according to the primary key and is written to the new part.
/// Parts are merged in the background according to a heuristic algorithm.
/// For each part the index file is created containing primary key values for every n-th row.
/// This allows efficient selection by primary key range predicate.
///
/// Additionally:
///
/// The date column is specified. For each part min and max dates are remembered.
/// Essentially it is an index too.
///
2017-09-13 16:22:04 +00:00
/// Data is partitioned by the value of the partitioning expression.
/// Parts belonging to different partitions are not merged - for the ease of administration (data sync and backup).
2017-02-07 17:52:41 +00:00
///
2017-09-13 16:22:04 +00:00
/// File structure of old-style month-partitioned tables (format_version = 0):
2017-02-07 17:52:41 +00:00
/// Part directory - / min-date _ max-date _ min-id _ max-id _ level /
/// Inside the part directory:
/// checksums.txt - contains the list of all files along with their sizes and checksums.
/// columns.txt - contains the list of all columns and their types.
/// primary.idx - contains the primary index.
/// [Column].bin - contains compressed column data.
/// [Column].mrk - marks, pointing to seek positions allowing to skip n * k rows.
///
2017-09-13 16:22:04 +00:00
/// File structure of tables with custom partitioning (format_version >= 1):
2020-05-21 09:00:44 +00:00
/// Part directory - / partition-id _ min-id _ max-id _ level /
2017-09-13 16:22:04 +00:00
/// Inside the part directory:
/// The same files as for month-partitioned tables, plus
/// count.txt - contains total number of rows in this part.
/// partition.dat - contains the value of the partitioning expression.
2020-01-17 12:24:27 +00:00
/// minmax_[Column].idx - MinMax indexes (see IMergeTreeDataPart::MinMaxIndex class) for the columns required by the partitioning expression.
2017-09-13 16:22:04 +00:00
///
2017-02-07 17:52:41 +00:00
/// Several modes are implemented. Modes determine additional actions during merge:
/// - Ordinary - don't do anything special
/// - Collapsing - collapse pairs of rows with the opposite values of sign_columns for the same values
/// of primary key (cf. CollapsingSortedBlockInputStream.h)
/// - Replacing - for all rows with the same primary key keep only the latest one. Or, if the version
/// column is set, keep the latest row with the maximal version.
/// - Summing - sum all numeric columns not contained in the primary key for all rows with the same primary key.
/// - Aggregating - merge columns containing aggregate function states for all rows with the same primary key.
/// - Graphite - performs coarsening of historical data for Graphite (a system for quantitative monitoring).
/// The MergeTreeData class contains a list of parts and the data structure parameters.
/// To read and modify the data use other classes:
/// - MergeTreeDataSelectExecutor
/// - MergeTreeDataWriter
/// - MergeTreeDataMergerMutator
2017-02-07 17:52:41 +00:00
2019-05-03 02:00:57 +00:00
class MergeTreeData : public IStorage
2014-03-09 17:36:01 +00:00
{
public:
/// Function to call if the part is suspected to contain corrupt data.
using BrokenPartCallback = std::function<void (const String &)>;
2019-10-10 16:30:30 +00:00
using DataPart = IMergeTreeDataPart;
using MutableDataPartPtr = std::shared_ptr<DataPart>;
using MutableDataPartsVector = std::vector<MutableDataPartPtr>;
/// After the DataPart is added to the working set, it cannot be changed.
using DataPartPtr = std::shared_ptr<const DataPart>;
2019-10-10 16:30:30 +00:00
using DataPartState = IMergeTreeDataPart::State;
using DataPartStates = std::initializer_list<DataPartState>;
using DataPartStateVector = std::vector<DataPartState>;
/// Auxiliary structure for index comparison. Keep in mind lifetime of MergeTreePartInfo.
struct DataPartStateAndInfo
{
DataPartState state;
const MergeTreePartInfo & info;
};
/// Auxiliary structure for index comparison
struct DataPartStateAndPartitionID
{
DataPartState state;
String partition_id;
};
STRONG_TYPEDEF(String, PartitionID)
2020-04-03 10:40:46 +00:00
/// Alter conversions which should be applied on-fly for part. Build from of
/// the most recent mutation commands for part. Now we have only rename_map
/// here (from ALTER_RENAME) command, because for all other type of alters
/// we can deduce conversions for part from difference between
/// part->getColumns() and storage->getColumns().
2020-03-24 17:05:38 +00:00
struct AlterConversions
{
2020-04-03 10:40:46 +00:00
/// Rename map new_name -> old_name
2020-03-24 17:05:38 +00:00
std::unordered_map<String, String> rename_map;
2020-04-03 10:40:46 +00:00
bool isColumnRenamed(const String & new_name) const { return rename_map.count(new_name) > 0; }
String getColumnOldName(const String & new_name) const { return rename_map.at(new_name); }
2020-03-24 17:05:38 +00:00
};
struct LessDataPart
{
using is_transparent = void;
bool operator()(const DataPartPtr & lhs, const MergeTreePartInfo & rhs) const { return lhs->info < rhs; }
bool operator()(const MergeTreePartInfo & lhs, const DataPartPtr & rhs) const { return lhs < rhs->info; }
bool operator()(const DataPartPtr & lhs, const DataPartPtr & rhs) const { return lhs->info < rhs->info; }
bool operator()(const MergeTreePartInfo & lhs, const PartitionID & rhs) const { return lhs.partition_id < rhs.toUnderType(); }
bool operator()(const PartitionID & lhs, const MergeTreePartInfo & rhs) const { return lhs.toUnderType() < rhs.partition_id; }
};
struct LessStateDataPart
{
using is_transparent = void;
bool operator() (const DataPartStateAndInfo & lhs, const DataPartStateAndInfo & rhs) const
{
return std::forward_as_tuple(static_cast<UInt8>(lhs.state), lhs.info)
< std::forward_as_tuple(static_cast<UInt8>(rhs.state), rhs.info);
}
bool operator() (DataPartStateAndInfo info, const DataPartState & state) const
{
return static_cast<size_t>(info.state) < static_cast<size_t>(state);
}
bool operator() (const DataPartState & state, DataPartStateAndInfo info) const
{
return static_cast<size_t>(state) < static_cast<size_t>(info.state);
}
bool operator() (const DataPartStateAndInfo & lhs, const DataPartStateAndPartitionID & rhs) const
{
return std::forward_as_tuple(static_cast<UInt8>(lhs.state), lhs.info.partition_id)
< std::forward_as_tuple(static_cast<UInt8>(rhs.state), rhs.partition_id);
}
bool operator() (const DataPartStateAndPartitionID & lhs, const DataPartStateAndInfo & rhs) const
{
return std::forward_as_tuple(static_cast<UInt8>(lhs.state), lhs.partition_id)
< std::forward_as_tuple(static_cast<UInt8>(rhs.state), rhs.info.partition_id);
}
};
using DataParts = std::set<DataPartPtr, LessDataPart>;
using DataPartsVector = std::vector<DataPartPtr>;
using DataPartsLock = std::unique_lock<std::mutex>;
DataPartsLock lockParts() const { return DataPartsLock(data_parts_mutex); }
2020-02-13 14:42:48 +00:00
MergeTreeDataPartType choosePartType(size_t bytes_uncompressed, size_t rows_count) const;
2020-04-14 19:47:19 +00:00
MergeTreeDataPartType choosePartTypeOnDisk(size_t bytes_uncompressed, size_t rows_count) const;
2020-02-13 14:42:48 +00:00
/// After this method setColumns must be called
2019-11-21 16:10:22 +00:00
MutableDataPartPtr createPart(const String & name,
2019-11-22 12:51:00 +00:00
MergeTreeDataPartType type, const MergeTreePartInfo & part_info,
const VolumePtr & volume, const String & relative_path) const;
2019-11-21 16:10:22 +00:00
2020-06-03 09:51:23 +00:00
/// Create part, that already exists on filesystem.
/// After this methods 'loadColumnsChecksumsIndexes' must be called.
2019-11-21 16:10:22 +00:00
MutableDataPartPtr createPart(const String & name,
const VolumePtr & volume, const String & relative_path) const;
2019-11-21 16:10:22 +00:00
MutableDataPartPtr createPart(const String & name, const MergeTreePartInfo & part_info,
const VolumePtr & volume, const String & relative_path) const;
2019-11-21 16:10:22 +00:00
/// Auxiliary object to add a set of parts into the working set in two steps:
/// * First, as PreCommitted parts (the parts are ready, but not yet in the active set).
/// * Next, if commit() is called, the parts are added to the active set and the parts that are
/// covered by them are marked Outdated.
/// If neither commit() nor rollback() was called, the destructor rollbacks the operation.
class Transaction : private boost::noncopyable
{
public:
Transaction(MergeTreeData & data_) : data(data_) {}
DataPartsVector commit(MergeTreeData::DataPartsLock * acquired_parts_lock = nullptr);
void rollback();
2020-09-17 19:30:17 +00:00
/// Immediately remove parts from table's data_parts set and change part
/// state to temporary. Useful for new parts which not present in table.
void rollbackPartsToTemporaryState();
2018-05-13 00:24:23 +00:00
size_t size() const { return precommitted_parts.size(); }
bool isEmpty() const { return precommitted_parts.empty(); }
~Transaction()
{
try
{
rollback();
}
catch (...)
{
tryLogCurrentException("~MergeTreeData::Transaction");
}
}
private:
friend class MergeTreeData;
MergeTreeData & data;
DataParts precommitted_parts;
void clear() { precommitted_parts.clear(); }
};
using PathWithDisk = std::pair<String, DiskPtr>;
2019-07-30 17:24:40 +00:00
struct PartsTemporaryRename : private boost::noncopyable
{
2019-08-29 16:17:47 +00:00
PartsTemporaryRename(
const MergeTreeData & storage_,
const String & source_dir_)
: storage(storage_)
, source_dir(source_dir_)
{
}
2019-07-30 17:24:40 +00:00
void addPart(const String & old_name, const String & new_name);
2019-07-31 14:44:55 +00:00
/// Renames part from old_name to new_name
void tryRenameAll();
2019-07-30 17:24:40 +00:00
/// Renames all added parts from new_name to old_name if old name is not empty
~PartsTemporaryRename();
2019-07-31 14:44:55 +00:00
const MergeTreeData & storage;
2019-08-29 16:17:47 +00:00
const String source_dir;
2019-07-30 17:24:40 +00:00
std::vector<std::pair<String, String>> old_and_new_names;
std::unordered_map<String, PathWithDisk> old_part_name_to_path_and_disk;
2019-07-31 14:44:55 +00:00
bool renamed = false;
2019-07-30 17:24:40 +00:00
};
/// Parameters for various modes.
struct MergingParams
{
/// Merging mode. See above.
enum Mode
{
Ordinary = 0, /// Enum values are saved. Do not change them.
Collapsing = 1,
Summing = 2,
Aggregating = 3,
Replacing = 5,
Graphite = 6,
VersionedCollapsing = 7,
};
Mode mode;
/// For Collapsing and VersionedCollapsing mode.
String sign_column;
/// For Summing mode. If empty - columns_to_sum is determined automatically.
Names columns_to_sum;
/// For Replacing and VersionedCollapsing mode. Can be empty for Replacing.
String version_column;
/// For Graphite mode.
Graphite::Params graphite_params;
/// Check that needed columns are present and have correct types.
2020-07-06 14:33:31 +00:00
void check(const StorageInMemoryMetadata & metadata) const;
String getModeName() const;
};
/// Attach the table corresponding to the directory in full_path inside policy (must end with /), with the given columns.
/// Correctness of names and paths is not checked.
///
/// date_column_name - if not empty, the name of the Date column used for partitioning by month.
/// Otherwise, partition_by_ast is used for partitioning.
///
/// order_by_ast - a single expression or a tuple. It is used as a sorting key
/// (an ASTExpressionList used for sorting data in parts);
/// primary_key_ast - can be nullptr, an expression, or a tuple.
/// Used to determine an ASTExpressionList values of which are written in the primary.idx file
/// for one row in every `index_granularity` rows to speed up range queries.
2018-10-15 18:02:07 +00:00
/// Primary key must be a prefix of the sorting key;
/// If it is nullptr, then it will be determined from order_by_ast.
2018-10-15 18:02:07 +00:00
///
/// require_part_metadata - should checksums.txt and columns.txt exist in the part directory.
/// attach - whether the existing table is attached or the new table is created.
2019-12-04 16:06:55 +00:00
MergeTreeData(const StorageID & table_id_,
const String & relative_data_path_,
2020-06-09 17:28:29 +00:00
const StorageInMemoryMetadata & metadata_,
Context & context_,
const String & date_column_name,
const MergingParams & merging_params_,
2019-08-26 14:24:29 +00:00
std::unique_ptr<MergeTreeSettings> settings_,
bool require_part_metadata_,
bool attach,
BrokenPartCallback broken_part_callback_ = [](const String &){});
2019-12-26 18:17:05 +00:00
StoragePolicyPtr getStoragePolicy() const override;
2019-05-03 02:00:57 +00:00
bool supportsPrewhere() const override { return true; }
2019-05-03 02:00:57 +00:00
bool supportsFinal() const override
{
return merging_params.mode == MergingParams::Collapsing
|| merging_params.mode == MergingParams::Summing
|| merging_params.mode == MergingParams::Aggregating
|| merging_params.mode == MergingParams::Replacing
|| merging_params.mode == MergingParams::VersionedCollapsing;
}
2019-08-07 15:21:45 +00:00
bool supportsSettings() const override { return true; }
NamesAndTypesList getVirtuals() const override;
2019-08-07 15:21:45 +00:00
2020-06-27 20:13:16 +00:00
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context &, const StorageMetadataPtr & metadata_snapshot) const override;
2019-05-04 03:45:58 +00:00
/// Load the set of data parts from disk. Call once - immediately after the object is created.
void loadDataParts(bool skip_sanity_checks);
String getLogName() const { return log_name; }
2019-05-04 03:45:58 +00:00
Int64 getMaxBlockNumber() const;
/// Returns a copy of the list so that the caller shouldn't worry about locks.
DataParts getDataParts(const DataPartStates & affordable_states) const;
/// Returns sorted list of the parts with specified states
/// out_states will contain snapshot of each part state
DataPartsVector getDataPartsVector(const DataPartStates & affordable_states, DataPartStateVector * out_states = nullptr) const;
/// Returns absolutely all parts (and snapshot of their states)
DataPartsVector getAllDataPartsVector(DataPartStateVector * out_states = nullptr) const;
/// Returns all detached parts
2019-07-31 14:44:55 +00:00
DetachedPartsInfo getDetachedParts() const;
2019-07-26 20:04:45 +00:00
void validateDetachedPartName(const String & name) const;
2019-07-30 17:24:40 +00:00
void dropDetached(const ASTPtr & partition, bool part, const Context & context);
2019-07-30 19:11:15 +00:00
MutableDataPartsVector tryLoadPartsToAttach(const ASTPtr & partition, bool attach_part,
const Context & context, PartsTemporaryRename & renamed_parts);
/// Returns Committed parts
DataParts getDataParts() const;
DataPartsVector getDataPartsVector() const;
/// Returns a committed part with the given name or a part containing it. If there is no such part, returns nullptr.
DataPartPtr getActiveContainingPart(const String & part_name) const;
DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info) const;
DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info, DataPartState state, DataPartsLock & lock) const;
2019-08-16 15:57:19 +00:00
/// Swap part with it's identical copy (possible with another path on another disk).
/// If original part is not active or doesn't exist exception will be thrown.
2019-08-19 14:40:12 +00:00
void swapActivePart(MergeTreeData::DataPartPtr part_copy);
2019-06-07 19:16:42 +00:00
/// Returns all parts in specified partition
DataPartsVector getDataPartsVectorInPartition(DataPartState state, const String & partition_id);
/// Returns the part with the given name and state or nullptr if no such part.
DataPartPtr getPartIfExists(const String & part_name, const DataPartStates & valid_states);
DataPartPtr getPartIfExists(const MergeTreePartInfo & part_info, const DataPartStates & valid_states);
/// Total size of active parts in bytes.
size_t getTotalActiveSizeInBytes() const;
size_t getTotalActiveSizeInRows() const;
size_t getPartsCount() const;
size_t getMaxPartsCountForPartition() const;
/// Get min value of part->info.getDataVersion() for all active parts.
/// Makes sense only for ordinary MergeTree engines because for them block numbering doesn't depend on partition.
std::optional<Int64> getMinPartDataVersion() const;
/// If the table contains too many active parts, sleep for a while to give them time to merge.
/// If until is non-null, wake up from the sleep earlier if the event happened.
void delayInsertOrThrowIfNeeded(Poco::Event * until = nullptr) const;
void throwInsertIfNeeded() const;
/// Renames temporary part to a permanent part and adds it to the parts set.
/// It is assumed that the part does not intersect with existing parts.
2020-08-08 00:47:03 +00:00
/// If increment != nullptr, part index is determining using increment. Otherwise part index remains unchanged.
/// If out_transaction != nullptr, adds the part in the PreCommitted state (the part will be added to the
/// active set later with out_transaction->commit()).
/// Else, commits the part immediately.
/// Returns true if part was added. Returns false if part is covered by bigger part.
bool renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr);
/// The same as renameTempPartAndAdd but the block range of the part can contain existing parts.
/// Returns all parts covered by the added part (in ascending order).
/// If out_transaction == nullptr, marks covered parts as Outdated.
DataPartsVector renameTempPartAndReplace(
MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr);
/// Low-level version of previous one, doesn't lock mutex
bool renameTempPartAndReplace(
MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction, DataPartsLock & lock,
DataPartsVector * out_covered_parts = nullptr);
2020-09-17 19:30:17 +00:00
/// Remove parts from working set immediately (without wait for background
/// process). Transfer part state to temporary. Have very limited usage only
/// for new parts which don't already present in table.
void removePartsFromWorkingSetImmediatelyAndSetTemporaryState(const DataPartsVector & remove);
/// Removes parts from the working set parts.
/// Parts in add must already be in data_parts with PreCommitted, Committed, or Outdated states.
/// If clear_without_timeout is true, the parts will be deleted at once, or during the next call to
/// clearOldParts (ignoring old_parts_lifetime).
void removePartsFromWorkingSet(const DataPartsVector & remove, bool clear_without_timeout, DataPartsLock * acquired_lock = nullptr);
void removePartsFromWorkingSet(const DataPartsVector & remove, bool clear_without_timeout, DataPartsLock & acquired_lock);
/// Removes all parts from the working set parts
/// for which (partition_id = drop_range.partition_id && min_block >= drop_range.min_block && max_block <= drop_range.max_block).
/// If a part intersecting drop_range.max_block is found, an exception will be thrown.
/// Used in REPLACE PARTITION command;
DataPartsVector removePartsInRangeFromWorkingSet(const MergeTreePartInfo & drop_range, bool clear_without_timeout,
bool skip_intersecting_parts, DataPartsLock & lock);
/// Renames the part to detached/<prefix>_<part> and removes it from working set.
void removePartsFromWorkingSetAndCloneToDetached(const DataPartsVector & parts, bool clear_without_timeout, const String & prefix = "");
/// Renames the part to detached/<prefix>_<part> and removes it from data_parts,
//// so it will not be deleted in clearOldParts.
/// If restore_covered is true, adds to the working set inactive parts, which were merged into the deleted part.
void forgetPartAndMoveToDetached(const DataPartPtr & part, const String & prefix = "", bool restore_covered = false);
/// If the part is Obsolete and not used by anybody else, immediately delete it from filesystem and remove from memory.
void tryRemovePartImmediately(DataPartPtr && part);
2020-01-13 14:45:13 +00:00
/// Returns old inactive parts that can be deleted. At the same time removes them from the list of parts but not from the disk.
/// If 'force' - don't wait for old_parts_lifetime.
2020-01-10 09:46:24 +00:00
DataPartsVector grabOldParts(bool force = false);
/// Reverts the changes made by grabOldParts(), parts should be in Deleting state.
void rollbackDeletingParts(const DataPartsVector & parts);
/// Removes parts from data_parts, they should be in Deleting state
void removePartsFinally(const DataPartsVector & parts);
/// Delete irrelevant parts from memory and disk.
2020-01-13 14:45:13 +00:00
/// If 'force' - don't wait for old_parts_lifetime.
2020-01-10 09:46:24 +00:00
void clearOldPartsFromFilesystem(bool force = false);
2019-08-11 19:14:42 +00:00
void clearPartsFromFilesystem(const DataPartsVector & parts);
/// Delete WAL files containing parts, that all already stored on disk.
void clearOldWriteAheadLogs();
/// Delete all directories which names begin with "tmp"
/// Set non-negative parameter value to override MergeTreeSettings temporary_directories_lifetime
2020-06-23 08:04:43 +00:00
/// Must be called with locked lockForShare() because use relative_data_path.
void clearOldTemporaryDirectories(ssize_t custom_directories_lifetime_seconds = -1);
/// After the call to dropAllData() no method can be called.
/// Deletes the data directory and flushes the uncompressed blocks cache and the marks cache.
void dropAllData();
/// Drop data directories if they are empty. It is safe to call this method if table creation was unsuccessful.
void dropIfEmpty();
2020-06-23 08:04:43 +00:00
/// Moves the entire data directory. Flushes the uncompressed blocks cache
/// and the marks cache. Must be called with locked lockExclusively()
/// because changes relative_data_path.
2020-04-07 14:05:51 +00:00
void rename(const String & new_table_path, const StorageID & new_table_id) override;
/// Check if the ALTER can be performed:
/// - all needed columns are present.
/// - all type conversions can be done.
2019-01-16 16:53:38 +00:00
/// - columns corresponding to primary key, indices, sign, sampling expression and date are not affected.
/// If something is wrong, throws an exception.
2020-06-10 11:16:31 +00:00
void checkAlterIsPossible(const AlterCommands & commands, const Settings & settings) const override;
2020-07-14 08:19:39 +00:00
/// Checks that partition name in all commands is valid
2020-07-13 17:27:52 +00:00
void checkAlterPartitionIsPossible(const PartitionCommands & commands, const StorageMetadataPtr & metadata_snapshot, const Settings & settings) const override;
/// Change MergeTreeSettings
void changeSettings(
2020-07-10 08:13:21 +00:00
const ASTPtr & new_settings,
TableLockHolder & table_lock_holder);
2019-08-06 13:04:29 +00:00
2018-11-01 10:35:50 +00:00
/// Freezes all parts.
2020-07-28 15:10:36 +00:00
PartitionCommandsResultInfo freezeAll(
2020-07-10 08:13:21 +00:00
const String & with_name,
const StorageMetadataPtr & metadata_snapshot,
const Context & context,
TableLockHolder & table_lock_holder);
2018-11-01 10:35:50 +00:00
/// Should be called if part data is suspected to be corrupted.
2018-10-17 03:13:00 +00:00
void reportBrokenPart(const String & name) const
{
broken_part_callback(name);
}
2020-06-16 08:39:12 +00:00
/// TODO (alesap) Duplicate method required for compatibility.
/// Must be removed.
static ASTPtr extractKeyExpressionList(const ASTPtr & node)
{
return DB::extractKeyExpressionList(node);
}
2020-01-22 19:52:55 +00:00
/** Create local backup (snapshot) for parts with specified prefix.
* Backup is created in directory clickhouse_dir/shadow/i/, where i - incremental number,
* or if 'with_name' is specified - backup is created in directory with specified name.
*/
2020-07-28 15:10:36 +00:00
PartitionCommandsResultInfo freezePartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, const String & with_name, const Context & context, TableLockHolder & table_lock_holder);
public:
/// Moves partition to specified Disk
2019-08-20 09:59:19 +00:00
void movePartitionToDisk(const ASTPtr & partition, const String & name, bool moving_part, const Context & context);
2019-07-18 15:19:03 +00:00
/// Moves partition to specified Volume
2019-08-20 09:59:19 +00:00
void movePartitionToVolume(const ASTPtr & partition, const String & name, bool moving_part, const Context & context);
2020-07-13 16:19:08 +00:00
void checkPartitionCanBeDropped(const ASTPtr & partition) override;
void checkPartCanBeDropped(const ASTPtr & part);
size_t getColumnCompressedSize(const std::string & name) const
{
2019-03-28 19:58:41 +00:00
auto lock = lockParts();
const auto it = column_sizes.find(name);
return it == std::end(column_sizes) ? 0 : it->second.data_compressed;
}
ColumnSizeByName getColumnSizes() const override
{
2019-03-28 19:58:41 +00:00
auto lock = lockParts();
return column_sizes;
}
/// For ATTACH/DETACH/DROP PARTITION.
2020-07-13 17:27:52 +00:00
String getPartitionIDFromQuery(const ASTPtr & ast, const Context & context) const;
/// Extracts MergeTreeData of other *MergeTree* storage
/// and checks that their structure suitable for ALTER TABLE ATTACH PARTITION FROM
/// Tables structure should be locked.
MergeTreeData & checkStructureAndGetMergeTreeData(const StoragePtr & source_table, const StorageMetadataPtr & src_snapshot, const StorageMetadataPtr & my_snapshot) const;
MergeTreeData & checkStructureAndGetMergeTreeData(IStorage & source_table, const StorageMetadataPtr & src_snapshot, const StorageMetadataPtr & my_snapshot) const;
MergeTreeData::MutableDataPartPtr cloneAndLoadDataPartOnSameDisk(
2020-06-26 11:30:23 +00:00
const MergeTreeData::DataPartPtr & src_part, const String & tmp_part_prefix, const MergeTreePartInfo & dst_part_info, const StorageMetadataPtr & metadata_snapshot);
2019-05-03 02:00:57 +00:00
virtual std::vector<MergeTreeMutationStatus> getMutationsStatus() const = 0;
/// Returns true if table can create new parts with adaptive granularity
/// Has additional constraint in replicated version
virtual bool canUseAdaptiveGranularity() const
2019-06-19 14:46:06 +00:00
{
2019-08-26 14:24:29 +00:00
const auto settings = getSettings();
2019-08-13 10:29:31 +00:00
return settings->index_granularity_bytes != 0 &&
(settings->enable_mixed_granularity_parts || !has_non_adaptive_index_granularity_parts);
2019-06-19 14:46:06 +00:00
}
2019-08-28 18:23:20 +00:00
/// Get constant pointer to storage settings.
/// Copy this pointer into your scope and you will
/// get consistent settings.
MergeTreeSettingsPtr getSettings() const
{
return storage_settings.get();
}
2019-06-19 16:16:13 +00:00
2020-04-08 08:41:13 +00:00
String getRelativeDataPath() const { return relative_data_path; }
2019-09-06 15:09:20 +00:00
/// Get table path on disk
2019-11-27 09:39:44 +00:00
String getFullPathOnDisk(const DiskPtr & disk) const;
2019-09-06 15:09:20 +00:00
/// Get disk where part is located.
/// `additional_path` can be set if part is not located directly in table data path (e.g. 'detached/')
DiskPtr getDiskForPart(const String & part_name, const String & additional_path = "") const;
2019-09-06 15:09:20 +00:00
/// Get full path for part. Uses getDiskForPart and returns the full relative path.
/// `additional_path` can be set if part is not located directly in table data path (e.g. 'detached/')
std::optional<String> getFullRelativePathForPart(const String & part_name, const String & additional_path = "") const;
2020-11-01 17:38:43 +00:00
bool storesDataOnDisk() const override { return true; }
Strings getDataPaths() const override;
using PathsWithDisks = std::vector<PathWithDisk>;
PathsWithDisks getRelativeDataPathsWithDisks() const;
2019-12-05 08:05:07 +00:00
/// Reserves space at least 1MB.
ReservationPtr reserveSpace(UInt64 expected_size) const;
2019-12-05 08:05:07 +00:00
/// Reserves space at least 1MB on specific disk or volume.
2020-03-18 00:57:00 +00:00
static ReservationPtr reserveSpace(UInt64 expected_size, SpacePtr space);
static ReservationPtr tryReserveSpace(UInt64 expected_size, SpacePtr space);
2019-12-05 08:05:07 +00:00
/// Reserves space at least 1MB preferring best destination according to `ttl_infos`.
2020-06-17 13:39:26 +00:00
ReservationPtr reserveSpacePreferringTTLRules(
2020-10-05 16:41:46 +00:00
const StorageMetadataPtr & metadata_snapshot,
2020-06-17 13:39:26 +00:00
UInt64 expected_size,
const IMergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move,
size_t min_volume_index = 0,
bool is_insert = false) const;
2020-06-17 13:39:26 +00:00
ReservationPtr tryReserveSpacePreferringTTLRules(
2020-10-05 16:41:46 +00:00
const StorageMetadataPtr & metadata_snapshot,
2020-06-17 13:39:26 +00:00
UInt64 expected_size,
const IMergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move,
size_t min_volume_index = 0,
bool is_insert = false) const;
2020-06-17 13:39:26 +00:00
/// Choose disk with max available free space
/// Reserves 0 bytes
ReservationPtr makeEmptyReservationOnLargestDisk() { return getStoragePolicy()->makeEmptyReservationOnLargestDisk(); }
2020-04-03 10:40:46 +00:00
/// Return alter conversions for part which must be applied on fly.
2020-03-24 17:05:38 +00:00
AlterConversions getAlterConversionsForPart(const MergeTreeDataPartPtr part) const;
/// Returns destination disk or volume for the TTL rule according to current storage policy
/// 'is_insert' - is TTL move performed on new data part insert.
2020-09-18 15:41:14 +00:00
SpacePtr getDestinationForMoveTTL(const TTLDescription & move_ttl, bool is_insert = false) const;
2020-05-25 17:07:14 +00:00
/// Checks if given part already belongs destination disk or volume for the
/// TTL rule.
2020-05-28 15:33:44 +00:00
bool isPartInTTLDestination(const TTLDescription & ttl, const IMergeTreeDataPart & part) const;
2020-03-24 17:05:38 +00:00
2020-09-04 10:08:09 +00:00
/// Get count of total merges with TTL in MergeList (system.merges) for all
/// tables (not only current table).
/// Method is cheap and doesn't require any locks.
size_t getTotalMergesWithTTLInMergeList() const;
2020-04-14 19:47:19 +00:00
using WriteAheadLogPtr = std::shared_ptr<MergeTreeWriteAheadLog>;
2020-06-30 18:47:12 +00:00
WriteAheadLogPtr getWriteAheadLog();
2020-04-14 19:47:19 +00:00
MergeTreeDataFormatVersion format_version;
2019-11-04 15:43:13 +00:00
Context & global_context;
/// Merging params - what additional actions to perform during merge.
const MergingParams merging_params;
bool is_custom_partitioned = false;
ExpressionActionsPtr minmax_idx_expr;
Names minmax_idx_columns;
DataTypes minmax_idx_column_types;
Int64 minmax_idx_date_column_pos = -1; /// In a common case minmax index includes a date column.
Int64 minmax_idx_time_column_pos = -1; /// In other cases, minmax index often includes a dateTime column.
ExpressionActionsPtr getPrimaryKeyAndSkipIndicesExpression(const StorageMetadataPtr & metadata_snapshot) const;
ExpressionActionsPtr getSortingKeyAndSkipIndicesExpression(const StorageMetadataPtr & metadata_snapshot) const;
2020-09-07 07:59:14 +00:00
/// Get compression codec for part according to TTL rules and <compression>
/// section from config.xml.
2020-08-31 19:50:42 +00:00
CompressionCodecPtr getCompressionCodecForPart(size_t part_size_compressed, const IMergeTreeDataPart::TTLInfos & ttl_infos, time_t current_time) const;
2020-01-22 19:52:55 +00:00
/// Limiting parallel sends per one table, used in DataPartsExchange
std::atomic_uint current_table_sends {0};
/// For generating names of temporary parts during insertion.
SimpleIncrement insert_increment;
2019-06-19 14:46:06 +00:00
bool has_non_adaptive_index_granularity_parts = false;
2019-09-05 13:12:29 +00:00
/// Parts that currently moving from disk/volume to another.
/// This set have to be used with `currently_processing_in_background_mutex`.
/// Moving may conflict with merges and mutations, but this is OK, because
/// if we decide to move some part to another disk, than we
/// assuredly will choose this disk for containing part, which will appear
/// as result of merge or mutation.
DataParts currently_moving_parts;
2019-09-06 15:09:20 +00:00
/// Mutex for currently_moving_parts
2019-09-05 15:53:23 +00:00
mutable std::mutex moving_parts_mutex;
2019-08-13 08:35:49 +00:00
2020-10-16 10:12:31 +00:00
/// Return main processing background job, like merge/mutate/fetch and so on
virtual std::optional<JobAndPool> getDataProcessingJob() = 0;
/// Return job to move parts between disks/volumes and so on.
std::optional<JobAndPool> getDataMovingJob();
2020-10-14 07:22:48 +00:00
bool areBackgroundMovesNeeded() const;
2020-10-13 14:25:42 +00:00
2019-05-03 02:00:57 +00:00
protected:
2019-08-13 08:35:49 +00:00
2019-10-10 16:30:30 +00:00
friend class IMergeTreeDataPart;
friend class MergeTreeDataMergerMutator;
friend struct ReplicatedMergeTreeTableMetadata;
2019-05-03 02:00:57 +00:00
friend class StorageReplicatedMergeTree;
bool require_part_metadata;
/// Relative path data, changes during rename for ordinary databases use
/// under lockForShare if rename is possible.
String relative_data_path;
/// Current column sizes in compressed and uncompressed form.
ColumnSizeByName column_sizes;
/// Engine-specific methods
BrokenPartCallback broken_part_callback;
String log_name;
2020-05-30 21:57:37 +00:00
Poco::Logger * log;
2019-08-26 14:24:29 +00:00
/// Storage settings.
/// Use get and set to receive readonly versions.
MultiVersion<MergeTreeSettings> storage_settings;
/// Work with data parts
struct TagByInfo{};
struct TagByStateAndInfo{};
static const MergeTreePartInfo & dataPartPtrToInfo(const DataPartPtr & part)
{
return part->info;
}
static DataPartStateAndInfo dataPartPtrToStateAndInfo(const DataPartPtr & part)
{
return {part->state, part->info};
}
using DataPartsIndexes = boost::multi_index_container<DataPartPtr,
boost::multi_index::indexed_by<
/// Index by Info
boost::multi_index::ordered_unique<
boost::multi_index::tag<TagByInfo>,
boost::multi_index::global_fun<const DataPartPtr &, const MergeTreePartInfo &, dataPartPtrToInfo>
>,
/// Index by (State, Info), is used to obtain ordered slices of parts with the same state
boost::multi_index::ordered_unique<
boost::multi_index::tag<TagByStateAndInfo>,
boost::multi_index::global_fun<const DataPartPtr &, DataPartStateAndInfo, dataPartPtrToStateAndInfo>,
LessStateDataPart
>
>
>;
/// Current set of data parts.
mutable std::mutex data_parts_mutex;
DataPartsIndexes data_parts_indexes;
DataPartsIndexes::index<TagByInfo>::type & data_parts_by_info;
DataPartsIndexes::index<TagByStateAndInfo>::type & data_parts_by_state_and_info;
2019-09-05 13:12:29 +00:00
MergeTreePartsMover parts_mover;
using DataPartIteratorByInfo = DataPartsIndexes::index<TagByInfo>::type::iterator;
using DataPartIteratorByStateAndInfo = DataPartsIndexes::index<TagByStateAndInfo>::type::iterator;
boost::iterator_range<DataPartIteratorByStateAndInfo> getDataPartsStateRange(DataPartState state) const
{
auto begin = data_parts_by_state_and_info.lower_bound(state, LessStateDataPart());
auto end = data_parts_by_state_and_info.upper_bound(state, LessStateDataPart());
return {begin, end};
}
boost::iterator_range<DataPartIteratorByInfo> getDataPartsPartitionRange(const String & partition_id) const
{
auto begin = data_parts_by_info.lower_bound(PartitionID(partition_id), LessDataPart());
auto end = data_parts_by_info.upper_bound(PartitionID(partition_id), LessDataPart());
return {begin, end};
}
static decltype(auto) getStateModifier(DataPartState state)
{
return [state] (const DataPartPtr & part) { part->state = state; };
}
void modifyPartState(DataPartIteratorByStateAndInfo it, DataPartState state)
{
if (!data_parts_by_state_and_info.modify(it, getStateModifier(state)))
throw Exception("Can't modify " + (*it)->getNameWithState(), ErrorCodes::LOGICAL_ERROR);
}
void modifyPartState(DataPartIteratorByInfo it, DataPartState state)
{
if (!data_parts_by_state_and_info.modify(data_parts_indexes.project<TagByStateAndInfo>(it), getStateModifier(state)))
throw Exception("Can't modify " + (*it)->getNameWithState(), ErrorCodes::LOGICAL_ERROR);
}
void modifyPartState(const DataPartPtr & part, DataPartState state)
{
auto it = data_parts_by_info.find(part->info);
if (it == data_parts_by_info.end() || (*it).get() != part.get())
throw Exception("Part " + part->name + " doesn't exist", ErrorCodes::LOGICAL_ERROR);
if (!data_parts_by_state_and_info.modify(data_parts_indexes.project<TagByStateAndInfo>(it), getStateModifier(state)))
throw Exception("Can't modify " + (*it)->getNameWithState(), ErrorCodes::LOGICAL_ERROR);
}
/// Used to serialize calls to grabOldParts.
std::mutex grab_old_parts_mutex;
/// The same for clearOldTemporaryDirectories.
std::mutex clear_old_temporary_directories_mutex;
void checkProperties(const StorageInMemoryMetadata & new_metadata, const StorageInMemoryMetadata & old_metadata, bool attach = false) const;
2020-06-10 11:16:31 +00:00
void setProperties(const StorageInMemoryMetadata & new_metadata, const StorageInMemoryMetadata & old_metadata, bool attach = false);
void checkPartitionKeyAndInitMinMax(const KeyDescription & new_partition_key);
void checkTTLExpressions(const StorageInMemoryMetadata & new_metadata, const StorageInMemoryMetadata & old_metadata) const;
2020-04-02 16:11:10 +00:00
2020-04-22 06:22:14 +00:00
void checkStoragePolicy(const StoragePolicyPtr & new_storage_policy) const;
/// Calculates column sizes in compressed form for the current state of data_parts. Call with data_parts mutex locked.
void calculateColumnSizesImpl();
/// Adds or subtracts the contribution of the part to compressed column sizes.
void addPartContributionToColumnSizes(const DataPartPtr & part);
void removePartContributionToColumnSizes(const DataPartPtr & part);
/// If there is no part in the partition with ID `partition_id`, returns empty ptr. Should be called under the lock.
2020-07-13 17:27:52 +00:00
DataPartPtr getAnyPartInPartition(const String & partition_id, DataPartsLock & data_parts_lock) const;
/// Return parts in the Committed set that are covered by the new_part_info or the part that covers it.
/// Will check that the new part doesn't already exist and that it doesn't intersect existing part.
DataPartsVector getActivePartsToReplace(
const MergeTreePartInfo & new_part_info,
const String & new_part_name,
DataPartPtr & out_covering_part,
DataPartsLock & data_parts_lock) const;
2018-03-16 06:51:37 +00:00
/// Checks whether the column is in the primary key, possibly wrapped in a chain of functions with single argument.
2020-06-17 12:39:20 +00:00
bool isPrimaryOrMinMaxKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node, const StorageMetadataPtr & metadata_snapshot) const;
/// Common part for |freezePartition()| and |freezeAll()|.
using MatcherFn = std::function<bool(const DataPartPtr &)>;
2020-07-28 15:10:36 +00:00
PartitionCommandsResultInfo freezePartitionsByMatcher(MatcherFn matcher, const StorageMetadataPtr & metadata_snapshot, const String & with_name, const Context & context);
2019-06-19 16:16:13 +00:00
2020-03-09 01:50:33 +00:00
bool canReplacePartition(const DataPartPtr & src_part) const;
2019-09-03 11:32:25 +00:00
void writePartLog(
PartLogElement::Type type,
const ExecutionStatus & execution_status,
UInt64 elapsed_ns,
const String & new_part_name,
const DataPartPtr & result_part,
const DataPartsVector & source_parts,
const MergeListEntry * merge_entry);
2019-09-05 15:53:23 +00:00
2019-09-06 15:09:20 +00:00
/// If part is assigned to merge or mutation (possibly replicated)
2020-08-08 00:47:03 +00:00
/// Should be overridden by children, because they can have different
2019-09-10 11:21:59 +00:00
/// mechanisms for parts locking
2019-09-05 15:53:23 +00:00
virtual bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const = 0;
2020-04-03 10:40:46 +00:00
/// Return most recent mutations commands for part which weren't applied
/// Used to receive AlterConversions for part and apply them on fly. This
/// method has different implementations for replicated and non replicated
/// MergeTree because they store mutations in different way.
2020-03-24 17:05:38 +00:00
virtual MutationCommands getFirtsAlterMutationCommandsForPart(const DataPartPtr & part) const = 0;
2019-09-10 11:21:59 +00:00
/// Moves part to specified space, used in ALTER ... MOVE ... queries
2019-11-27 09:39:44 +00:00
bool movePartsToSpace(const DataPartsVector & parts, SpacePtr space);
2019-09-05 15:53:23 +00:00
2019-09-10 11:21:59 +00:00
/// Selects parts for move and moves them, used in background process
2019-09-05 15:53:23 +00:00
bool selectPartsAndMove();
2019-09-05 15:53:23 +00:00
private:
2019-09-06 15:09:20 +00:00
/// RAII Wrapper for atomic work with currently moving parts
2020-06-27 19:05:00 +00:00
/// Acquire them in constructor and remove them in destructor
2019-09-10 11:21:59 +00:00
/// Uses data.currently_moving_parts_mutex
2019-09-05 15:53:23 +00:00
struct CurrentlyMovingPartsTagger
{
MergeTreeMovingParts parts_to_move;
MergeTreeData & data;
CurrentlyMovingPartsTagger(MergeTreeMovingParts && moving_parts_, MergeTreeData & data_);
~CurrentlyMovingPartsTagger();
};
2020-10-14 07:22:48 +00:00
using CurrentlyMovingPartsTaggerPtr = std::shared_ptr<CurrentlyMovingPartsTagger>;
2019-09-10 11:21:59 +00:00
/// Move selected parts to corresponding disks
2020-10-14 07:22:48 +00:00
bool moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger);
2019-09-05 15:53:23 +00:00
2019-09-06 15:09:20 +00:00
/// Select parts for move and disks for them. Used in background moving processes.
2020-10-14 07:22:48 +00:00
CurrentlyMovingPartsTaggerPtr selectPartsForMove();
2019-09-06 15:09:20 +00:00
2019-09-10 11:21:59 +00:00
/// Check selected parts for movements. Used by ALTER ... MOVE queries.
2020-10-14 07:22:48 +00:00
CurrentlyMovingPartsTaggerPtr checkPartsForMove(const DataPartsVector & parts, SpacePtr space);
bool canUsePolymorphicParts(const MergeTreeSettings & settings, String * out_reason = nullptr) const;
2020-04-14 19:47:19 +00:00
2020-06-30 18:47:12 +00:00
std::mutex write_ahead_log_mutex;
2020-04-14 19:47:19 +00:00
WriteAheadLogPtr write_ahead_log;
virtual void startBackgroundMovesIfNeeded() = 0;
2020-07-12 12:58:17 +00:00
bool allow_nullable_key{};
2014-03-09 17:36:01 +00:00
};
}