mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-17 13:13:36 +00:00
Almost working ttl in IStorage
This commit is contained in:
parent
e812fa15d1
commit
9b3cc9e525
@ -110,10 +110,11 @@ void TTLBlockInputStream::readSuffixImpl()
|
||||
|
||||
void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block)
|
||||
{
|
||||
storage.rows_ttl_entry.expression->execute(block);
|
||||
const auto & rows_ttl = storage.getRowsTTL();
|
||||
rows_ttl.expression->execute(block);
|
||||
|
||||
const IColumn * ttl_column =
|
||||
block.getByName(storage.rows_ttl_entry.result_column).column.get();
|
||||
block.getByName(rows_ttl.result_column).column.get();
|
||||
|
||||
const auto & column_names = header.getNames();
|
||||
MutableColumns result_columns;
|
||||
@ -152,7 +153,8 @@ void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block)
|
||||
}
|
||||
|
||||
std::vector<String> columns_to_remove;
|
||||
for (const auto & [name, ttl_entry] : storage.column_ttl_entries_by_name)
|
||||
const auto & column_ttl_entries_by_name = storage.getColumnTTLs();
|
||||
for (const auto & [name, ttl_entry] : column_ttl_entries_by_name)
|
||||
{
|
||||
/// If we read not all table columns. E.g. while mutation.
|
||||
if (!block.has(name))
|
||||
@ -212,8 +214,9 @@ void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block)
|
||||
|
||||
void TTLBlockInputStream::updateMovesTTL(Block & block)
|
||||
{
|
||||
const auto & move_ttl_entries = storage.getMoveTTLs();
|
||||
std::vector<String> columns_to_remove;
|
||||
for (const auto & ttl_entry : storage.move_ttl_entries)
|
||||
for (const auto & ttl_entry : move_ttl_entries)
|
||||
{
|
||||
auto & new_ttl_info = new_ttl_infos.moves_ttl[ttl_entry.result_column];
|
||||
|
||||
|
@ -541,4 +541,50 @@ Names IStorage::getColumnsRequiredForSampling() const
|
||||
return {};
|
||||
}
|
||||
|
||||
|
||||
const StorageMetadataTTLField & IStorage::getRowsTTL() const
|
||||
{
|
||||
return table_ttl.rows_ttl;
|
||||
}
|
||||
|
||||
bool IStorage::hasRowsTTL() const
|
||||
{
|
||||
return table_ttl.rows_ttl.expression != nullptr;
|
||||
}
|
||||
|
||||
const StorageMetadataTTLFields & IStorage::getMoveTTLs() const
|
||||
{
|
||||
return table_ttl.move_ttl;
|
||||
}
|
||||
|
||||
bool IStorage::hasAnyMoveTTL() const
|
||||
{
|
||||
return !table_ttl.move_ttl.empty();
|
||||
}
|
||||
|
||||
const StorageMetadataTTLColumnEntries & IStorage::getColumnTTLs() const
|
||||
{
|
||||
return column_ttls_by_name;
|
||||
}
|
||||
|
||||
void IStorage::setColumnTTLs(const StorageMetadataTTLColumnEntries & column_ttls_by_name_)
|
||||
{
|
||||
column_ttls_by_name = column_ttls_by_name_;
|
||||
}
|
||||
|
||||
const StorageMetadataTableTTL & IStorage::getTableTTLs() const
|
||||
{
|
||||
return table_ttl;
|
||||
}
|
||||
|
||||
void IStorage::setTableTTLs(const StorageMetadataTableTTL & table_ttl_)
|
||||
{
|
||||
table_ttl = table_ttl_;
|
||||
}
|
||||
|
||||
bool IStorage::hasAnyTableTTL() const
|
||||
{
|
||||
return hasAnyMoveTTL() || hasRowsTTL();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -130,10 +130,7 @@ public:
|
||||
virtual bool hasEvenlyDistributedRead() const { return false; }
|
||||
|
||||
/// Returns true if there is set table TTL, any column TTL or any move TTL.
|
||||
virtual bool hasAnyTTL() const { return false; }
|
||||
|
||||
/// Returns true if there is set TTL for rows.
|
||||
virtual bool hasRowsTTL() const { return false; }
|
||||
virtual bool hasAnyTTL() const { return hasRowsTTL() || hasAnyTableTTL(); }
|
||||
|
||||
/// Optional size information of each physical column.
|
||||
/// Currently it's only used by the MergeTree family for query optimizations.
|
||||
@ -205,6 +202,9 @@ private:
|
||||
StorageMetadataKeyField sorting_key;
|
||||
StorageMetadataKeyField sampling_key;
|
||||
|
||||
StorageMetadataTTLColumnEntries column_ttls_by_name;
|
||||
StorageMetadataTableTTL table_ttl;
|
||||
|
||||
private:
|
||||
RWLockImpl::LockHolder tryLockTimed(
|
||||
const RWLock & rwlock, RWLockImpl::Type type, const String & query_id, const SettingSeconds & acquire_timeout) const;
|
||||
@ -514,6 +514,22 @@ public:
|
||||
/// Returns storage policy if storage supports it
|
||||
virtual StoragePolicyPtr getStoragePolicy() const { return {}; }
|
||||
|
||||
/// Returns true if there is set TTL for rows.
|
||||
const StorageMetadataTTLField & getRowsTTL() const;
|
||||
bool hasRowsTTL() const;
|
||||
|
||||
const StorageMetadataTTLFields & getMoveTTLs() const;
|
||||
bool hasAnyMoveTTL() const;
|
||||
|
||||
const StorageMetadataTableTTL & getTableTTLs() const;
|
||||
void setTableTTLs(const StorageMetadataTableTTL & table_ttl_);
|
||||
bool hasAnyTableTTL() const;
|
||||
|
||||
const StorageMetadataTTLColumnEntries & getColumnTTLs() const;
|
||||
void setColumnTTLs(const StorageMetadataTTLColumnEntries & column_ttls_by_name_);
|
||||
bool hasAnyColumnTTL() const { return !column_ttls_by_name.empty(); }
|
||||
|
||||
|
||||
/// If it is possible to quickly determine exact number of rows in the table at this moment of time, then return it.
|
||||
/// Used for:
|
||||
/// - Simple count() opimization
|
||||
|
@ -580,55 +580,17 @@ void MergeTreeData::initPartitionKey(ASTPtr partition_by_ast)
|
||||
setPartitionKey(new_partition_key);
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
void checkTTLExpression(const ExpressionActionsPtr & ttl_expression, const String & result_column_name)
|
||||
{
|
||||
for (const auto & action : ttl_expression->getActions())
|
||||
{
|
||||
if (action.type == ExpressionAction::APPLY_FUNCTION)
|
||||
{
|
||||
IFunctionBase & func = *action.function_base;
|
||||
if (!func.isDeterministic())
|
||||
throw Exception("TTL expression cannot contain non-deterministic functions, "
|
||||
"but contains function " + func.getName(), ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
}
|
||||
|
||||
const auto & result_column = ttl_expression->getSampleBlock().getByName(result_column_name);
|
||||
|
||||
if (!typeid_cast<const DataTypeDateTime *>(result_column.type.get())
|
||||
&& !typeid_cast<const DataTypeDate *>(result_column.type.get()))
|
||||
{
|
||||
throw Exception("TTL expression result column should have DateTime or Date type, but has "
|
||||
+ result_column.type->getName(), ErrorCodes::BAD_TTL_EXPRESSION);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns,
|
||||
const ASTPtr & new_ttl_table_ast, bool only_check)
|
||||
{
|
||||
|
||||
auto new_column_ttls = new_columns.getColumnTTLs();
|
||||
auto new_column_ttl_asts = new_columns.getColumnTTLs();
|
||||
|
||||
auto create_ttl_entry = [this, &new_columns](ASTPtr ttl_ast)
|
||||
{
|
||||
TTLEntry result;
|
||||
StorageMetadataTTLColumnEntries new_column_ttl_by_name = getColumnTTLs();
|
||||
|
||||
auto syntax_result = SyntaxAnalyzer(global_context).analyze(ttl_ast, new_columns.getAllPhysical());
|
||||
result.expression = ExpressionAnalyzer(ttl_ast, syntax_result, global_context).getActions(false);
|
||||
result.destination_type = DataDestinationType::DELETE;
|
||||
result.result_column = ttl_ast->getColumnName();
|
||||
|
||||
checkTTLExpression(result.expression, result.result_column);
|
||||
return result;
|
||||
};
|
||||
|
||||
if (!new_column_ttls.empty())
|
||||
if (!new_column_ttl_asts.empty())
|
||||
{
|
||||
NameSet columns_ttl_forbidden;
|
||||
|
||||
@ -640,23 +602,24 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns,
|
||||
for (const auto & col : getColumnsRequiredForSortingKey())
|
||||
columns_ttl_forbidden.insert(col);
|
||||
|
||||
for (const auto & [name, ast] : new_column_ttls)
|
||||
for (const auto & [name, ast] : new_column_ttl_asts)
|
||||
{
|
||||
if (columns_ttl_forbidden.count(name))
|
||||
throw Exception("Trying to set TTL for key column " + name, ErrorCodes::ILLEGAL_COLUMN);
|
||||
else
|
||||
{
|
||||
auto new_ttl_entry = create_ttl_entry(ast);
|
||||
auto new_ttl_entry = StorageMetadataTTLField::getTTLFromAST(ast, new_columns, global_context);
|
||||
new_column_ttl_by_name[name] = new_ttl_entry;
|
||||
}
|
||||
}
|
||||
if (!only_check)
|
||||
column_ttl_entries_by_name[name] = new_ttl_entry;
|
||||
}
|
||||
}
|
||||
setColumnTTLs(new_column_ttl_by_name);
|
||||
}
|
||||
|
||||
if (new_ttl_table_ast)
|
||||
{
|
||||
std::vector<TTLEntry> update_move_ttl_entries;
|
||||
TTLEntry update_rows_ttl_entry;
|
||||
StorageMetadataTTLFields update_move_ttl_entries;
|
||||
StorageMetadataTTLField update_rows_ttl_entry;
|
||||
|
||||
bool seen_delete_ttl = false;
|
||||
for (const auto & ttl_element_ptr : new_ttl_table_ast->children)
|
||||
@ -672,20 +635,16 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns,
|
||||
throw Exception("More than one DELETE TTL expression is not allowed", ErrorCodes::BAD_TTL_EXPRESSION);
|
||||
}
|
||||
|
||||
auto new_rows_ttl_entry = create_ttl_entry(ttl_element->children[0]);
|
||||
if (!only_check)
|
||||
auto new_rows_ttl_entry = StorageMetadataTTLField::getTTLFromAST(ttl_element_ptr, new_columns, global_context);
|
||||
update_rows_ttl_entry = new_rows_ttl_entry;
|
||||
|
||||
seen_delete_ttl = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
auto new_ttl_entry = create_ttl_entry(ttl_element->children[0]);
|
||||
auto new_ttl_entry = StorageMetadataTTLField::getTTLFromAST(ttl_element_ptr, new_columns, global_context);
|
||||
|
||||
new_ttl_entry.entry_ast = ttl_element_ptr;
|
||||
new_ttl_entry.destination_type = ttl_element->destination_type;
|
||||
new_ttl_entry.destination_name = ttl_element->destination_name;
|
||||
if (!new_ttl_entry.getDestination(getStoragePolicy()))
|
||||
if (!getDestinationForTTL(new_ttl_entry))
|
||||
{
|
||||
String message;
|
||||
if (new_ttl_entry.destination_type == DataDestinationType::DISK)
|
||||
@ -695,18 +654,21 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns,
|
||||
throw Exception(message, ErrorCodes::BAD_TTL_EXPRESSION);
|
||||
}
|
||||
|
||||
if (!only_check)
|
||||
update_move_ttl_entries.emplace_back(std::move(new_ttl_entry));
|
||||
}
|
||||
}
|
||||
|
||||
if (!only_check)
|
||||
{
|
||||
rows_ttl_entry = update_rows_ttl_entry;
|
||||
ttl_table_ast = new_ttl_table_ast;
|
||||
StorageMetadataTableTTL new_table_ttl
|
||||
{
|
||||
.definition_ast = new_ttl_table_ast,
|
||||
.rows_ttl = update_rows_ttl_entry,
|
||||
.move_ttl = update_move_ttl_entries,
|
||||
};
|
||||
|
||||
auto move_ttl_entries_lock = std::lock_guard<std::mutex>(move_ttl_entries_mutex);
|
||||
move_ttl_entries = update_move_ttl_entries;
|
||||
setTableTTLs(new_table_ttl);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2918,7 +2880,7 @@ ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(UInt64 expected_
|
||||
auto ttl_entry = selectTTLEntryForTTLInfos(ttl_infos, time_of_move);
|
||||
if (ttl_entry)
|
||||
{
|
||||
SpacePtr destination_ptr = ttl_entry->getDestination(getStoragePolicy());
|
||||
SpacePtr destination_ptr = getDestinationForTTL(*ttl_entry);
|
||||
if (!destination_ptr)
|
||||
{
|
||||
if (ttl_entry->destination_type == DataDestinationType::VOLUME)
|
||||
@ -2952,37 +2914,39 @@ ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(UInt64 expected_
|
||||
return reservation;
|
||||
}
|
||||
|
||||
SpacePtr MergeTreeData::TTLEntry::getDestination(StoragePolicyPtr policy) const
|
||||
SpacePtr MergeTreeData::getDestinationForTTL(const StorageMetadataTTLField & ttl) const
|
||||
{
|
||||
if (destination_type == DataDestinationType::VOLUME)
|
||||
return policy->getVolumeByName(destination_name);
|
||||
else if (destination_type == DataDestinationType::DISK)
|
||||
return policy->getDiskByName(destination_name);
|
||||
auto policy = getStoragePolicy();
|
||||
if (ttl.destination_type == DataDestinationType::VOLUME)
|
||||
return policy->getVolumeByName(ttl.destination_name);
|
||||
else if (ttl.destination_type == DataDestinationType::DISK)
|
||||
return policy->getDiskByName(ttl.destination_name);
|
||||
else
|
||||
return {};
|
||||
}
|
||||
|
||||
bool MergeTreeData::TTLEntry::isPartInDestination(StoragePolicyPtr policy, const IMergeTreeDataPart & part) const
|
||||
bool MergeTreeData::isPartInTTLDestination(const StorageMetadataTTLField & ttl, const IMergeTreeDataPart & part) const
|
||||
{
|
||||
if (destination_type == DataDestinationType::VOLUME)
|
||||
auto policy = getStoragePolicy();
|
||||
if (ttl.destination_type == DataDestinationType::VOLUME)
|
||||
{
|
||||
for (const auto & disk : policy->getVolumeByName(destination_name)->getDisks())
|
||||
for (const auto & disk : policy->getVolumeByName(ttl.destination_name)->getDisks())
|
||||
if (disk->getName() == part.volume->getDisk()->getName())
|
||||
return true;
|
||||
}
|
||||
else if (destination_type == DataDestinationType::DISK)
|
||||
return policy->getDiskByName(destination_name)->getName() == part.volume->getDisk()->getName();
|
||||
else if (ttl.destination_type == DataDestinationType::DISK)
|
||||
return policy->getDiskByName(ttl.destination_name)->getName() == part.volume->getDisk()->getName();
|
||||
return false;
|
||||
}
|
||||
|
||||
std::optional<MergeTreeData::TTLEntry> MergeTreeData::selectTTLEntryForTTLInfos(
|
||||
const IMergeTreeDataPart::TTLInfos & ttl_infos,
|
||||
time_t time_of_move) const
|
||||
std::optional<StorageMetadataTTLField>
|
||||
MergeTreeData::selectTTLEntryForTTLInfos(const IMergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move) const
|
||||
{
|
||||
time_t max_max_ttl = 0;
|
||||
std::vector<DB::MergeTreeData::TTLEntry>::const_iterator best_entry_it;
|
||||
std::vector<StorageMetadataTTLField>::const_iterator best_entry_it;
|
||||
|
||||
auto lock = std::lock_guard(move_ttl_entries_mutex);
|
||||
const auto & move_ttl_entries = getMoveTTLs();
|
||||
for (auto ttl_entry_it = move_ttl_entries.begin(); ttl_entry_it != move_ttl_entries.end(); ++ttl_entry_it)
|
||||
{
|
||||
auto ttl_info_it = ttl_infos.moves_ttl.find(ttl_entry_it->result_column);
|
||||
@ -2996,7 +2960,7 @@ std::optional<MergeTreeData::TTLEntry> MergeTreeData::selectTTLEntryForTTLInfos(
|
||||
}
|
||||
}
|
||||
|
||||
return max_max_ttl ? *best_entry_it : std::optional<MergeTreeData::TTLEntry>();
|
||||
return max_max_ttl ? *best_entry_it : std::optional<StorageMetadataTTLField>();
|
||||
}
|
||||
|
||||
MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affordable_states) const
|
||||
@ -3416,7 +3380,7 @@ bool MergeTreeData::areBackgroundMovesNeeded() const
|
||||
if (policy->getVolumes().size() > 1)
|
||||
return true;
|
||||
|
||||
return policy->getVolumes().size() == 1 && policy->getVolumes()[0]->getDisks().size() > 1 && !move_ttl_entries.empty();
|
||||
return policy->getVolumes().size() == 1 && policy->getVolumes()[0]->getDisks().size() > 1 && hasAnyMoveTTL();
|
||||
}
|
||||
|
||||
bool MergeTreeData::movePartsToSpace(const DataPartsVector & parts, SpacePtr space)
|
||||
@ -3555,7 +3519,7 @@ ColumnDependencies MergeTreeData::getColumnDependencies(const NameSet & updated_
|
||||
|
||||
if (hasRowsTTL())
|
||||
{
|
||||
if (add_dependent_columns(rows_ttl_entry.expression, required_ttl_columns))
|
||||
if (add_dependent_columns(getRowsTTL().expression, required_ttl_columns))
|
||||
{
|
||||
/// Filter all columns, if rows TTL expression have to be recalculated.
|
||||
for (const auto & column : getColumns().getAllPhysical())
|
||||
@ -3563,13 +3527,13 @@ ColumnDependencies MergeTreeData::getColumnDependencies(const NameSet & updated_
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto & [name, entry] : column_ttl_entries_by_name)
|
||||
for (const auto & [name, entry] : getColumnTTLs())
|
||||
{
|
||||
if (add_dependent_columns(entry.expression, required_ttl_columns))
|
||||
updated_ttl_columns.insert(name);
|
||||
}
|
||||
|
||||
for (const auto & entry : move_ttl_entries)
|
||||
for (const auto & entry : getMoveTTLs())
|
||||
add_dependent_columns(entry.expression, required_ttl_columns);
|
||||
|
||||
for (const auto & column : indices_columns)
|
||||
|
@ -518,11 +518,6 @@ public:
|
||||
|
||||
bool hasSkipIndices() const { return !skip_indices.empty(); }
|
||||
|
||||
bool hasAnyColumnTTL() const { return !column_ttl_entries_by_name.empty(); }
|
||||
bool hasAnyMoveTTL() const { return !move_ttl_entries.empty(); }
|
||||
bool hasRowsTTL() const override { return !rows_ttl_entry.isEmpty(); }
|
||||
bool hasAnyTTL() const override { return hasRowsTTL() || hasAnyMoveTTL() || hasAnyColumnTTL(); }
|
||||
|
||||
/// Check that the part is not broken and calculate the checksums for it if they are not present.
|
||||
MutableDataPartPtr loadPartAndFixMetadata(const VolumePtr & volume, const String & relative_path) const;
|
||||
|
||||
@ -624,6 +619,13 @@ public:
|
||||
|
||||
/// Return alter conversions for part which must be applied on fly.
|
||||
AlterConversions getAlterConversionsForPart(const MergeTreeDataPartPtr part) const;
|
||||
/// Returns destination disk or volume for the TTL rule according to current
|
||||
/// storage policy
|
||||
SpacePtr getDestinationForTTL(const StorageMetadataTTLField & ttl) const;
|
||||
|
||||
/// Checks if given part already belongs destination disk or volume for the
|
||||
/// TTL rule.
|
||||
bool isPartInTTLDestination(const StorageMetadataTTLField & ttl, const IMergeTreeDataPart & part) const;
|
||||
|
||||
MergeTreeDataFormatVersion format_version;
|
||||
|
||||
@ -646,39 +648,13 @@ public:
|
||||
ExpressionActionsPtr primary_key_and_skip_indices_expr;
|
||||
ExpressionActionsPtr sorting_key_and_skip_indices_expr;
|
||||
|
||||
struct TTLEntry
|
||||
{
|
||||
ExpressionActionsPtr expression;
|
||||
String result_column;
|
||||
std::optional<StorageMetadataTTLField> selectTTLEntryForTTLInfos(const IMergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move) const;
|
||||
|
||||
/// Name and type of a destination are only valid in table-level context.
|
||||
DataDestinationType destination_type;
|
||||
String destination_name;
|
||||
|
||||
ASTPtr entry_ast;
|
||||
|
||||
/// Returns destination disk or volume for this rule.
|
||||
SpacePtr getDestination(StoragePolicyPtr policy) const;
|
||||
|
||||
/// Checks if given part already belongs destination disk or volume for this rule.
|
||||
bool isPartInDestination(StoragePolicyPtr policy, const IMergeTreeDataPart & part) const;
|
||||
|
||||
bool isEmpty() const { return expression == nullptr; }
|
||||
};
|
||||
|
||||
std::optional<TTLEntry> selectTTLEntryForTTLInfos(const IMergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move) const;
|
||||
|
||||
using TTLEntriesByName = std::unordered_map<String, TTLEntry>;
|
||||
TTLEntriesByName column_ttl_entries_by_name;
|
||||
|
||||
TTLEntry rows_ttl_entry;
|
||||
|
||||
/// This mutex is required for background move operations which do not obtain global locks.
|
||||
/// This mutex is required for background move operations which do not
|
||||
/// obtain global locks.
|
||||
/// TODO (alesap) It will be removed after metadata became atomic
|
||||
mutable std::mutex move_ttl_entries_mutex;
|
||||
|
||||
/// Vector rw operations have to be done under "move_ttl_entries_mutex".
|
||||
std::vector<TTLEntry> move_ttl_entries;
|
||||
|
||||
/// Limiting parallel sends per one table, used in DataPartsExchange
|
||||
std::atomic_uint current_table_sends {0};
|
||||
|
||||
|
@ -77,10 +77,12 @@ void buildScatterSelector(
|
||||
}
|
||||
|
||||
/// Computes ttls and updates ttl infos
|
||||
void updateTTL(const MergeTreeData::TTLEntry & ttl_entry,
|
||||
void updateTTL(
|
||||
const StorageMetadataTTLField & ttl_entry,
|
||||
IMergeTreeDataPart::TTLInfos & ttl_infos,
|
||||
DB::MergeTreeDataPartTTLInfo & ttl_info,
|
||||
Block & block, bool update_part_min_max_ttls)
|
||||
Block & block,
|
||||
bool update_part_min_max_ttls)
|
||||
{
|
||||
bool remove_column = false;
|
||||
if (!block.has(ttl_entry.result_column))
|
||||
@ -228,7 +230,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
|
||||
size_t expected_size = block.bytes();
|
||||
|
||||
DB::IMergeTreeDataPart::TTLInfos move_ttl_infos;
|
||||
for (const auto & ttl_entry : data.move_ttl_entries)
|
||||
const auto & move_ttl_entries = data.getMoveTTLs();
|
||||
for (const auto & ttl_entry : move_ttl_entries)
|
||||
updateTTL(ttl_entry, move_ttl_infos, move_ttl_infos.moves_ttl[ttl_entry.result_column], block, false);
|
||||
|
||||
NamesAndTypesList columns = data.getColumns().getAllPhysical().filter(block.getNames());
|
||||
@ -287,9 +290,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
|
||||
}
|
||||
|
||||
if (data.hasRowsTTL())
|
||||
updateTTL(data.rows_ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.table_ttl, block, true);
|
||||
updateTTL(data.getRowsTTL(), new_data_part->ttl_infos, new_data_part->ttl_infos.table_ttl, block, true);
|
||||
|
||||
for (const auto & [name, ttl_entry] : data.column_ttl_entries_by_name)
|
||||
for (const auto & [name, ttl_entry] : data.getColumnTTLs())
|
||||
updateTTL(ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.columns_ttl[name], block, true);
|
||||
|
||||
new_data_part->ttl_infos.update(move_ttl_infos);
|
||||
|
@ -128,14 +128,14 @@ bool MergeTreePartsMover::selectPartsForMove(
|
||||
if (!can_move(part, &reason))
|
||||
continue;
|
||||
|
||||
auto ttl_entry = part->storage.selectTTLEntryForTTLInfos(part->ttl_infos, time_of_move);
|
||||
auto ttl_entry = data->selectTTLEntryForTTLInfos(part->ttl_infos, time_of_move);
|
||||
auto to_insert = need_to_move.find(part->volume->getDisk());
|
||||
ReservationPtr reservation;
|
||||
if (ttl_entry)
|
||||
{
|
||||
auto destination = ttl_entry->getDestination(policy);
|
||||
if (destination && !ttl_entry->isPartInDestination(policy, *part))
|
||||
reservation = part->storage.tryReserveSpace(part->getBytesOnDisk(), ttl_entry->getDestination(policy));
|
||||
auto destination = data->getDestinationForTTL(*ttl_entry);
|
||||
if (destination && !data->isPartInTTLDestination(*ttl_entry, *part))
|
||||
reservation = data->tryReserveSpace(part->getBytesOnDisk(), data->getDestinationForTTL(*ttl_entry));
|
||||
}
|
||||
|
||||
if (reservation) /// Found reservation by TTL rule.
|
||||
|
@ -53,7 +53,7 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
|
||||
if (data.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
|
||||
partition_key = formattedAST(data.getPartitionKey().expression_list_ast);
|
||||
|
||||
ttl_table = formattedAST(data.ttl_table_ast);
|
||||
ttl_table = formattedAST(data.getTableTTLs().definition_ast);
|
||||
|
||||
skip_indices = data.getIndices().toString();
|
||||
if (data.canUseAdaptiveGranularity())
|
||||
|
@ -39,8 +39,7 @@ public:
|
||||
return part->storage.mayBenefitFromIndexForIn(left_in_operand, query_context);
|
||||
}
|
||||
|
||||
bool hasAnyTTL() const override { return part->storage.hasAnyTTL(); }
|
||||
bool hasRowsTTL() const override { return part->storage.hasRowsTTL(); }
|
||||
//bool hasAnyTTL() const override { return part->storage.hasAnyTTL(); }
|
||||
|
||||
ColumnDependencies getColumnDependencies(const NameSet & updated_columns) const override
|
||||
{
|
||||
@ -65,6 +64,8 @@ protected:
|
||||
setColumns(part_->storage.getColumns());
|
||||
setIndices(part_->storage.getIndices());
|
||||
setSortingKey(part_->storage.getSortingKey());
|
||||
setColumnTTLs(part->storage.getColumnTTLs());
|
||||
setTableTTLs(part->storage.getTableTTLs());
|
||||
}
|
||||
|
||||
private:
|
||||
|
@ -6,10 +6,21 @@
|
||||
#include <Parsers/ASTExpressionList.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/queryToString.h>
|
||||
#include <Parsers/ASTTTLElement.h>
|
||||
#include <Functions/IFunction.h>
|
||||
|
||||
#include <DataTypes/DataTypeDateTime.h>
|
||||
#include <DataTypes/DataTypeDate.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_TTL_EXPRESSION;
|
||||
};
|
||||
|
||||
StorageInMemoryMetadata::StorageInMemoryMetadata(
|
||||
const ColumnsDescription & columns_,
|
||||
const IndicesDescription & indices_,
|
||||
@ -138,4 +149,63 @@ StorageMetadataKeyField StorageMetadataKeyField::getKeyFromAST(const ASTPtr & de
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
void checkTTLExpression(const ExpressionActionsPtr & ttl_expression, const String & result_column_name)
|
||||
{
|
||||
for (const auto & action : ttl_expression->getActions())
|
||||
{
|
||||
if (action.type == ExpressionAction::APPLY_FUNCTION)
|
||||
{
|
||||
IFunctionBase & func = *action.function_base;
|
||||
if (!func.isDeterministic())
|
||||
throw Exception(
|
||||
"TTL expression cannot contain non-deterministic functions, "
|
||||
"but contains function "
|
||||
+ func.getName(),
|
||||
ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
}
|
||||
|
||||
const auto & result_column = ttl_expression->getSampleBlock().getByName(result_column_name);
|
||||
|
||||
if (!typeid_cast<const DataTypeDateTime *>(result_column.type.get())
|
||||
&& !typeid_cast<const DataTypeDate *>(result_column.type.get()))
|
||||
{
|
||||
throw Exception(
|
||||
"TTL expression result column should have DateTime or Date type, but has " + result_column.type->getName(),
|
||||
ErrorCodes::BAD_TTL_EXPRESSION);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
StorageMetadataTTLField StorageMetadataTTLField::getTTLFromAST(const ASTPtr & definition_ast, const ColumnsDescription & columns, const Context & context)
|
||||
{
|
||||
StorageMetadataTTLField result;
|
||||
const auto * ttl_element = definition_ast->as<ASTTTLElement>();
|
||||
|
||||
/// First child is expression, like `TTL expr TO DISK`
|
||||
if (ttl_element != nullptr)
|
||||
result.definition_ast = ttl_element->children.front()->clone();
|
||||
else
|
||||
result.definition_ast = definition_ast->clone();
|
||||
|
||||
auto ttl_ast = result.definition_ast->clone();
|
||||
auto syntax_result = SyntaxAnalyzer(context).analyze(ttl_ast, columns.getAllPhysical());
|
||||
result.expression = ExpressionAnalyzer(ttl_ast, syntax_result, context).getActions(false);
|
||||
|
||||
if (ttl_element != nullptr)
|
||||
{
|
||||
result.destination_type = ttl_element->destination_type;
|
||||
result.destination_name = ttl_element->destination_name;
|
||||
}
|
||||
|
||||
result.result_column = ttl_ast->getColumnName();
|
||||
|
||||
checkTTLExpression(result.expression, result.result_column);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
@ -84,6 +84,20 @@ struct StorageMetadataTTLField
|
||||
DataDestinationType destination_type;
|
||||
|
||||
String destination_name;
|
||||
|
||||
static StorageMetadataTTLField getTTLFromAST(const ASTPtr & definition_ast, const ColumnsDescription & columns, const Context & context);
|
||||
};
|
||||
|
||||
using StorageMetadataTTLColumnEntries = std::unordered_map<String, StorageMetadataTTLField>;
|
||||
using StorageMetadataTTLFields = std::vector<StorageMetadataTTLField>;
|
||||
|
||||
struct StorageMetadataTableTTL
|
||||
{
|
||||
ASTPtr definition_ast;
|
||||
|
||||
StorageMetadataTTLField rows_ttl;
|
||||
|
||||
StorageMetadataTTLFields move_ttl;
|
||||
};
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user