mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
Merge pull request #27019 from lthaooo/materialize_ttl_recalculate_only
Improvement of Materialize TTL
This commit is contained in:
commit
eaa49f56dd
@ -76,17 +76,17 @@ TTLBlockInputStream::TTLBlockInputStream(
|
||||
|
||||
algorithms.emplace_back(std::make_unique<TTLColumnAlgorithm>(
|
||||
description, old_ttl_infos.columns_ttl[name], current_time_,
|
||||
force_, name, default_expression, default_column_name));
|
||||
force_, name, default_expression, default_column_name, isCompactPart(data_part)));
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto & move_ttl : metadata_snapshot_->getMoveTTLs())
|
||||
algorithms.emplace_back(std::make_unique<TTLMoveAlgorithm>(
|
||||
move_ttl, old_ttl_infos.moves_ttl[move_ttl.result_column], current_time_, force_));
|
||||
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
|
||||
move_ttl, TTLUpdateField::MOVES_TTL, move_ttl.result_column, old_ttl_infos.moves_ttl[move_ttl.result_column], current_time_, force_));
|
||||
|
||||
for (const auto & recompression_ttl : metadata_snapshot_->getRecompressionTTLs())
|
||||
algorithms.emplace_back(std::make_unique<TTLRecompressionAlgorithm>(
|
||||
recompression_ttl, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_));
|
||||
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
|
||||
recompression_ttl, TTLUpdateField::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_));
|
||||
}
|
||||
|
||||
Block reorderColumns(Block block, const Block & header)
|
||||
|
77
src/DataStreams/TTLCalcInputStream.cpp
Normal file
77
src/DataStreams/TTLCalcInputStream.cpp
Normal file
@ -0,0 +1,77 @@
|
||||
#include <DataStreams/TTLCalcInputStream.h>
|
||||
#include <DataStreams/TTLUpdateInfoAlgorithm.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
TTLCalcInputStream::TTLCalcInputStream(
|
||||
const BlockInputStreamPtr & input_,
|
||||
const MergeTreeData & storage_,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
const MergeTreeData::MutableDataPartPtr & data_part_,
|
||||
time_t current_time_,
|
||||
bool force_)
|
||||
: data_part(data_part_)
|
||||
, log(&Poco::Logger::get(storage_.getLogName() + " (TTLCalcInputStream)"))
|
||||
{
|
||||
children.push_back(input_);
|
||||
header = children.at(0)->getHeader();
|
||||
auto old_ttl_infos = data_part->ttl_infos;
|
||||
|
||||
if (metadata_snapshot_->hasRowsTTL())
|
||||
{
|
||||
const auto & rows_ttl = metadata_snapshot_->getRowsTTL();
|
||||
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
|
||||
rows_ttl, TTLUpdateField::TABLE_TTL, rows_ttl.result_column, old_ttl_infos.table_ttl, current_time_, force_));
|
||||
}
|
||||
|
||||
for (const auto & where_ttl : metadata_snapshot_->getRowsWhereTTLs())
|
||||
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
|
||||
where_ttl, TTLUpdateField::ROWS_WHERE_TTL, where_ttl.result_column, old_ttl_infos.rows_where_ttl[where_ttl.result_column], current_time_, force_));
|
||||
|
||||
for (const auto & group_by_ttl : metadata_snapshot_->getGroupByTTLs())
|
||||
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
|
||||
group_by_ttl, TTLUpdateField::GROUP_BY_TTL, group_by_ttl.result_column, old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_));
|
||||
|
||||
if (metadata_snapshot_->hasAnyColumnTTL())
|
||||
{
|
||||
for (const auto & [name, description] : metadata_snapshot_->getColumnTTLs())
|
||||
{
|
||||
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
|
||||
description, TTLUpdateField::COLUMNS_TTL, name, old_ttl_infos.columns_ttl[name], current_time_, force_));
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto & move_ttl : metadata_snapshot_->getMoveTTLs())
|
||||
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
|
||||
move_ttl, TTLUpdateField::MOVES_TTL, move_ttl.result_column, old_ttl_infos.moves_ttl[move_ttl.result_column], current_time_, force_));
|
||||
|
||||
for (const auto & recompression_ttl : metadata_snapshot_->getRecompressionTTLs())
|
||||
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
|
||||
recompression_ttl, TTLUpdateField::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_));
|
||||
}
|
||||
|
||||
Block TTLCalcInputStream::readImpl()
|
||||
{
|
||||
auto block = children.at(0)->read();
|
||||
for (const auto & algorithm : algorithms)
|
||||
algorithm->execute(block);
|
||||
|
||||
if (!block)
|
||||
return block;
|
||||
|
||||
Block res;
|
||||
for (const auto & col : header)
|
||||
res.insert(block.getByName(col.name));
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
void TTLCalcInputStream::readSuffixImpl()
|
||||
{
|
||||
data_part->ttl_infos = {};
|
||||
for (const auto & algorithm : algorithms)
|
||||
algorithm->finalize(data_part);
|
||||
}
|
||||
|
||||
}
|
44
src/DataStreams/TTLCalcInputStream.h
Normal file
44
src/DataStreams/TTLCalcInputStream.h
Normal file
@ -0,0 +1,44 @@
|
||||
#pragma once
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <Storages/MergeTree/IMergeTreeDataPart.h>
|
||||
#include <Core/Block.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataPartTTLInfo.h>
|
||||
#include <DataStreams/ITTLAlgorithm.h>
|
||||
|
||||
#include <common/DateLUT.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class TTLCalcInputStream : public IBlockInputStream
|
||||
{
|
||||
public:
|
||||
TTLCalcInputStream(
|
||||
const BlockInputStreamPtr & input_,
|
||||
const MergeTreeData & storage_,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
const MergeTreeData::MutableDataPartPtr & data_part_,
|
||||
time_t current_time,
|
||||
bool force_
|
||||
);
|
||||
|
||||
String getName() const override { return "TTL_CALC"; }
|
||||
Block getHeader() const override { return header; }
|
||||
|
||||
protected:
|
||||
Block readImpl() override;
|
||||
|
||||
/// Finalizes ttl infos and updates data part
|
||||
void readSuffixImpl() override;
|
||||
|
||||
private:
|
||||
std::vector<TTLAlgorithmPtr> algorithms;
|
||||
|
||||
/// ttl_infos and empty_columns are updating while reading
|
||||
const MergeTreeData::MutableDataPartPtr & data_part;
|
||||
Poco::Logger * log;
|
||||
Block header;
|
||||
};
|
||||
|
||||
}
|
@ -10,11 +10,13 @@ TTLColumnAlgorithm::TTLColumnAlgorithm(
|
||||
bool force_,
|
||||
const String & column_name_,
|
||||
const ExpressionActionsPtr & default_expression_,
|
||||
const String & default_column_name_)
|
||||
const String & default_column_name_,
|
||||
bool is_compact_part_)
|
||||
: ITTLAlgorithm(description_, old_ttl_info_, current_time_, force_)
|
||||
, column_name(column_name_)
|
||||
, default_expression(default_expression_)
|
||||
, default_column_name(default_column_name_)
|
||||
, is_compact_part(is_compact_part_)
|
||||
{
|
||||
if (!isMinTTLExpired())
|
||||
{
|
||||
@ -40,7 +42,7 @@ void TTLColumnAlgorithm::execute(Block & block)
|
||||
return;
|
||||
|
||||
/// Later drop full column
|
||||
if (isMaxTTLExpired())
|
||||
if (isMaxTTLExpired() && !is_compact_part)
|
||||
return;
|
||||
|
||||
auto default_column = executeExpressionAndGetColumn(default_expression, block, default_column_name);
|
||||
|
@ -17,7 +17,9 @@ public:
|
||||
bool force_,
|
||||
const String & column_name_,
|
||||
const ExpressionActionsPtr & default_expression_,
|
||||
const String & default_column_name_);
|
||||
const String & default_column_name_,
|
||||
bool is_compact_part_
|
||||
);
|
||||
|
||||
void execute(Block & block) override;
|
||||
void finalize(const MutableDataPartPtr & data_part) const override;
|
||||
@ -28,6 +30,7 @@ private:
|
||||
const String default_column_name;
|
||||
|
||||
bool is_fully_empty = true;
|
||||
bool is_compact_part;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -4,8 +4,15 @@ namespace DB
|
||||
{
|
||||
|
||||
TTLUpdateInfoAlgorithm::TTLUpdateInfoAlgorithm(
|
||||
const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_)
|
||||
const TTLDescription & description_,
|
||||
const TTLUpdateField ttl_update_field_,
|
||||
const String ttl_update_key_,
|
||||
const TTLInfo & old_ttl_info_,
|
||||
time_t current_time_,
|
||||
bool force_)
|
||||
: ITTLAlgorithm(description_, old_ttl_info_, current_time_, force_)
|
||||
, ttl_update_field(ttl_update_field_)
|
||||
, ttl_update_key(ttl_update_key_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -22,26 +29,37 @@ void TTLUpdateInfoAlgorithm::execute(Block & block)
|
||||
}
|
||||
}
|
||||
|
||||
TTLMoveAlgorithm::TTLMoveAlgorithm(
|
||||
const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_)
|
||||
: TTLUpdateInfoAlgorithm(description_, old_ttl_info_, current_time_, force_)
|
||||
void TTLUpdateInfoAlgorithm::finalize(const MutableDataPartPtr & data_part) const
|
||||
{
|
||||
}
|
||||
if (ttl_update_field == TTLUpdateField::RECOMPRESSION_TTL)
|
||||
{
|
||||
data_part->ttl_infos.recompression_ttl[ttl_update_key] = new_ttl_info;
|
||||
}
|
||||
else if (ttl_update_field == TTLUpdateField::MOVES_TTL)
|
||||
{
|
||||
data_part->ttl_infos.moves_ttl[ttl_update_key] = new_ttl_info;
|
||||
}
|
||||
else if (ttl_update_field == TTLUpdateField::GROUP_BY_TTL)
|
||||
{
|
||||
data_part->ttl_infos.group_by_ttl[ttl_update_key] = new_ttl_info;
|
||||
data_part->ttl_infos.updatePartMinMaxTTL(new_ttl_info.min, new_ttl_info.max);
|
||||
}
|
||||
else if (ttl_update_field == TTLUpdateField::ROWS_WHERE_TTL)
|
||||
{
|
||||
data_part->ttl_infos.rows_where_ttl[ttl_update_key] = new_ttl_info;
|
||||
data_part->ttl_infos.updatePartMinMaxTTL(new_ttl_info.min, new_ttl_info.max);
|
||||
}
|
||||
else if (ttl_update_field == TTLUpdateField::TABLE_TTL)
|
||||
{
|
||||
data_part->ttl_infos.table_ttl = new_ttl_info;
|
||||
data_part->ttl_infos.updatePartMinMaxTTL(new_ttl_info.min, new_ttl_info.max);
|
||||
}
|
||||
else if (ttl_update_field == TTLUpdateField::COLUMNS_TTL)
|
||||
{
|
||||
data_part->ttl_infos.columns_ttl[ttl_update_key] = new_ttl_info;
|
||||
data_part->ttl_infos.updatePartMinMaxTTL(new_ttl_info.min, new_ttl_info.max);
|
||||
}
|
||||
|
||||
void TTLMoveAlgorithm::finalize(const MutableDataPartPtr & data_part) const
|
||||
{
|
||||
data_part->ttl_infos.moves_ttl[description.result_column] = new_ttl_info;
|
||||
}
|
||||
|
||||
TTLRecompressionAlgorithm::TTLRecompressionAlgorithm(
|
||||
const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_)
|
||||
: TTLUpdateInfoAlgorithm(description_, old_ttl_info_, current_time_, force_)
|
||||
{
|
||||
}
|
||||
|
||||
void TTLRecompressionAlgorithm::finalize(const MutableDataPartPtr & data_part) const
|
||||
{
|
||||
data_part->ttl_infos.recompression_ttl[description.result_column] = new_ttl_info;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -5,28 +5,35 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
enum class TTLUpdateField
|
||||
{
|
||||
COLUMNS_TTL,
|
||||
TABLE_TTL,
|
||||
ROWS_WHERE_TTL,
|
||||
MOVES_TTL,
|
||||
RECOMPRESSION_TTL,
|
||||
GROUP_BY_TTL,
|
||||
};
|
||||
|
||||
/// Calculates new ttl_info and does nothing with data.
|
||||
class TTLUpdateInfoAlgorithm : public ITTLAlgorithm
|
||||
{
|
||||
public:
|
||||
TTLUpdateInfoAlgorithm(const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_);
|
||||
TTLUpdateInfoAlgorithm(
|
||||
const TTLDescription & description_,
|
||||
const TTLUpdateField ttl_update_field_,
|
||||
const String ttl_update_key_,
|
||||
const TTLInfo & old_ttl_info_,
|
||||
time_t current_time_, bool force_
|
||||
);
|
||||
|
||||
void execute(Block & block) override;
|
||||
void finalize(const MutableDataPartPtr & data_part) const override = 0;
|
||||
void finalize(const MutableDataPartPtr & data_part) const override;
|
||||
|
||||
private:
|
||||
const TTLUpdateField ttl_update_field;
|
||||
const String ttl_update_key;
|
||||
};
|
||||
|
||||
class TTLMoveAlgorithm final : public TTLUpdateInfoAlgorithm
|
||||
{
|
||||
public:
|
||||
TTLMoveAlgorithm(const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_);
|
||||
void finalize(const MutableDataPartPtr & data_part) const override;
|
||||
};
|
||||
|
||||
class TTLRecompressionAlgorithm final : public TTLUpdateInfoAlgorithm
|
||||
{
|
||||
public:
|
||||
TTLRecompressionAlgorithm(const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_);
|
||||
void finalize(const MutableDataPartPtr & data_part) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -156,7 +156,7 @@ ColumnDependencies getAllColumnDependencies(const StorageMetadataPtr & metadata_
|
||||
ColumnDependencies dependencies;
|
||||
while (!new_updated_columns.empty())
|
||||
{
|
||||
auto new_dependencies = metadata_snapshot->getColumnDependencies(new_updated_columns);
|
||||
auto new_dependencies = metadata_snapshot->getColumnDependencies(new_updated_columns, true);
|
||||
new_updated_columns.clear();
|
||||
for (const auto & dependency : new_dependencies)
|
||||
{
|
||||
@ -303,6 +303,15 @@ static NameSet getKeyColumns(const StoragePtr & storage, const StorageMetadataPt
|
||||
return key_columns;
|
||||
}
|
||||
|
||||
static bool materializeTTLRecalculateOnly(const StoragePtr & storage)
|
||||
{
|
||||
auto storage_from_merge_tree_data_part = std::dynamic_pointer_cast<StorageFromMergeTreeDataPart>(storage);
|
||||
if (!storage_from_merge_tree_data_part)
|
||||
return false;
|
||||
|
||||
return storage_from_merge_tree_data_part->materializeTTLRecalculateOnly();
|
||||
}
|
||||
|
||||
static void validateUpdateColumns(
|
||||
const StoragePtr & storage,
|
||||
const StorageMetadataPtr & metadata_snapshot, const NameSet & updated_columns,
|
||||
@ -394,8 +403,13 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
|
||||
NamesAndTypesList all_columns = columns_desc.getAllPhysical();
|
||||
|
||||
NameSet updated_columns;
|
||||
bool materialize_ttl_recalculate_only = materializeTTLRecalculateOnly(storage);
|
||||
for (const MutationCommand & command : commands)
|
||||
{
|
||||
if (command.type == MutationCommand::Type::UPDATE
|
||||
|| command.type == MutationCommand::Type::DELETE)
|
||||
materialize_ttl_recalculate_only = false;
|
||||
|
||||
for (const auto & kv : command.column_to_update_expression)
|
||||
{
|
||||
updated_columns.insert(kv.first);
|
||||
@ -569,7 +583,18 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
|
||||
else if (command.type == MutationCommand::MATERIALIZE_TTL)
|
||||
{
|
||||
mutation_kind.set(MutationKind::MUTATE_OTHER);
|
||||
if (metadata_snapshot->hasRowsTTL())
|
||||
if (materialize_ttl_recalculate_only)
|
||||
{
|
||||
// just recalculate ttl_infos without remove expired data
|
||||
auto all_columns_vec = all_columns.getNames();
|
||||
auto new_dependencies = metadata_snapshot->getColumnDependencies(NameSet(all_columns_vec.begin(), all_columns_vec.end()), false);
|
||||
for (const auto & dependency : new_dependencies)
|
||||
{
|
||||
if (dependency.kind == ColumnDependency::TTL_EXPRESSION)
|
||||
dependencies.insert(dependency);
|
||||
}
|
||||
}
|
||||
else if (metadata_snapshot->hasRowsTTL())
|
||||
{
|
||||
for (const auto & column : all_columns)
|
||||
dependencies.emplace(column.name, ColumnDependency::TTL_TARGET);
|
||||
@ -594,19 +619,19 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
|
||||
}
|
||||
|
||||
/// Recalc only skip indices and projections of columns which could be updated by TTL.
|
||||
auto new_dependencies = metadata_snapshot->getColumnDependencies(new_updated_columns);
|
||||
auto new_dependencies = metadata_snapshot->getColumnDependencies(new_updated_columns, true);
|
||||
for (const auto & dependency : new_dependencies)
|
||||
{
|
||||
if (dependency.kind == ColumnDependency::SKIP_INDEX || dependency.kind == ColumnDependency::PROJECTION)
|
||||
dependencies.insert(dependency);
|
||||
}
|
||||
}
|
||||
|
||||
if (dependencies.empty())
|
||||
{
|
||||
/// Very rare case. It can happen if we have only one MOVE TTL with constant expression.
|
||||
/// But we still have to read at least one column.
|
||||
dependencies.emplace(all_columns.front().name, ColumnDependency::TTL_EXPRESSION);
|
||||
}
|
||||
if (dependencies.empty())
|
||||
{
|
||||
/// Very rare case. It can happen if we have only one MOVE TTL with constant expression.
|
||||
/// But we still have to read at least one column.
|
||||
dependencies.emplace(all_columns.front().name, ColumnDependency::TTL_EXPRESSION);
|
||||
}
|
||||
}
|
||||
else if (command.type == MutationCommand::READ_COLUMN)
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include <Storages/MergeTree/MergeTreeDataWriter.h>
|
||||
#include <Storages/MergeTree/StorageFromMergeTreeDataPart.h>
|
||||
#include <DataStreams/TTLBlockInputStream.h>
|
||||
#include <DataStreams/TTLCalcInputStream.h>
|
||||
#include <DataStreams/DistinctSortedBlockInputStream.h>
|
||||
#include <DataStreams/ExpressionBlockInputStream.h>
|
||||
#include <DataStreams/MaterializingBlockInputStream.h>
|
||||
@ -1288,10 +1289,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
|
||||
auto mrk_extension = source_part->index_granularity_info.is_adaptive ? getAdaptiveMrkExtension(new_data_part->getType())
|
||||
: getNonAdaptiveMrkExtension();
|
||||
bool need_sync = needSyncPart(source_part->rows_count, source_part->getBytesOnDisk(), *data_settings);
|
||||
bool need_remove_expired_values = false;
|
||||
auto execute_ttl_type = ExecuteTTLType::NONE;
|
||||
|
||||
if (in && shouldExecuteTTL(metadata_snapshot, interpreter->getColumnDependencies(), commands_for_part))
|
||||
need_remove_expired_values = true;
|
||||
if (in)
|
||||
execute_ttl_type = shouldExecuteTTL(metadata_snapshot, interpreter->getColumnDependencies());
|
||||
|
||||
/// All columns from part are changed and may be some more that were missing before in part
|
||||
/// TODO We can materialize compact part without copying data
|
||||
@ -1319,7 +1320,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
|
||||
time_of_mutation,
|
||||
compression_codec,
|
||||
merge_entry,
|
||||
need_remove_expired_values,
|
||||
execute_ttl_type,
|
||||
need_sync,
|
||||
space_reservation,
|
||||
holder,
|
||||
@ -1356,7 +1357,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
|
||||
return data.cloneAndLoadDataPartOnSameDisk(source_part, "tmp_clone_", future_part.part_info, metadata_snapshot);
|
||||
}
|
||||
|
||||
if (need_remove_expired_values)
|
||||
if (execute_ttl_type != ExecuteTTLType::NONE)
|
||||
files_to_skip.insert("ttl.txt");
|
||||
|
||||
disk->createDirectories(new_part_tmp_path);
|
||||
@ -1416,7 +1417,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
|
||||
time_of_mutation,
|
||||
compression_codec,
|
||||
merge_entry,
|
||||
need_remove_expired_values,
|
||||
execute_ttl_type,
|
||||
need_sync,
|
||||
space_reservation,
|
||||
holder,
|
||||
@ -1437,7 +1438,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
|
||||
}
|
||||
}
|
||||
|
||||
finalizeMutatedPart(source_part, new_data_part, need_remove_expired_values, compression_codec);
|
||||
finalizeMutatedPart(source_part, new_data_part, execute_ttl_type, compression_codec);
|
||||
}
|
||||
|
||||
return new_data_part;
|
||||
@ -1984,21 +1985,22 @@ std::set<MergeTreeProjectionPtr> MergeTreeDataMergerMutator::getProjectionsToRec
|
||||
return projections_to_recalc;
|
||||
}
|
||||
|
||||
bool MergeTreeDataMergerMutator::shouldExecuteTTL(
|
||||
const StorageMetadataPtr & metadata_snapshot, const ColumnDependencies & dependencies, const MutationCommands & commands)
|
||||
ExecuteTTLType MergeTreeDataMergerMutator::shouldExecuteTTL(const StorageMetadataPtr & metadata_snapshot, const ColumnDependencies & dependencies)
|
||||
{
|
||||
if (!metadata_snapshot->hasAnyTTL())
|
||||
return false;
|
||||
return ExecuteTTLType::NONE;
|
||||
|
||||
for (const auto & command : commands)
|
||||
if (command.type == MutationCommand::MATERIALIZE_TTL)
|
||||
return true;
|
||||
bool has_ttl_expression = false;
|
||||
|
||||
for (const auto & dependency : dependencies)
|
||||
if (dependency.kind == ColumnDependency::TTL_EXPRESSION || dependency.kind == ColumnDependency::TTL_TARGET)
|
||||
return true;
|
||||
{
|
||||
if (dependency.kind == ColumnDependency::TTL_EXPRESSION)
|
||||
has_ttl_expression = true;
|
||||
|
||||
return false;
|
||||
if (dependency.kind == ColumnDependency::TTL_TARGET)
|
||||
return ExecuteTTLType::NORMAL;
|
||||
}
|
||||
return has_ttl_expression ? ExecuteTTLType::RECALCULATE : ExecuteTTLType::NONE;
|
||||
}
|
||||
|
||||
// 1. get projection pipeline and a sink to write parts
|
||||
@ -2172,7 +2174,7 @@ void MergeTreeDataMergerMutator::mutateAllPartColumns(
|
||||
time_t time_of_mutation,
|
||||
const CompressionCodecPtr & compression_codec,
|
||||
MergeListEntry & merge_entry,
|
||||
bool need_remove_expired_values,
|
||||
ExecuteTTLType execute_ttl_type,
|
||||
bool need_sync,
|
||||
const ReservationPtr & space_reservation,
|
||||
TableLockHolder & holder,
|
||||
@ -2185,9 +2187,12 @@ void MergeTreeDataMergerMutator::mutateAllPartColumns(
|
||||
mutating_stream = std::make_shared<MaterializingBlockInputStream>(
|
||||
std::make_shared<ExpressionBlockInputStream>(mutating_stream, data.getPrimaryKeyAndSkipIndicesExpression(metadata_snapshot)));
|
||||
|
||||
if (need_remove_expired_values)
|
||||
if (execute_ttl_type == ExecuteTTLType::NORMAL)
|
||||
mutating_stream = std::make_shared<TTLBlockInputStream>(mutating_stream, data, metadata_snapshot, new_data_part, time_of_mutation, true);
|
||||
|
||||
if (execute_ttl_type == ExecuteTTLType::RECALCULATE)
|
||||
mutating_stream = std::make_shared<TTLCalcInputStream>(mutating_stream, data, metadata_snapshot, new_data_part, time_of_mutation, true);
|
||||
|
||||
IMergeTreeDataPart::MinMaxIndex minmax_idx;
|
||||
|
||||
MergedBlockOutputStream out{
|
||||
@ -2229,7 +2234,7 @@ void MergeTreeDataMergerMutator::mutateSomePartColumns(
|
||||
time_t time_of_mutation,
|
||||
const CompressionCodecPtr & compression_codec,
|
||||
MergeListEntry & merge_entry,
|
||||
bool need_remove_expired_values,
|
||||
ExecuteTTLType execute_ttl_type,
|
||||
bool need_sync,
|
||||
const ReservationPtr & space_reservation,
|
||||
TableLockHolder & holder,
|
||||
@ -2238,9 +2243,12 @@ void MergeTreeDataMergerMutator::mutateSomePartColumns(
|
||||
if (mutating_stream == nullptr)
|
||||
throw Exception("Cannot mutate part columns with uninitialized mutations stream. It's a bug", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
if (need_remove_expired_values)
|
||||
if (execute_ttl_type == ExecuteTTLType::NORMAL)
|
||||
mutating_stream = std::make_shared<TTLBlockInputStream>(mutating_stream, data, metadata_snapshot, new_data_part, time_of_mutation, true);
|
||||
|
||||
if (execute_ttl_type == ExecuteTTLType::RECALCULATE)
|
||||
mutating_stream = std::make_shared<TTLCalcInputStream>(mutating_stream, data, metadata_snapshot, new_data_part, time_of_mutation, true);
|
||||
|
||||
IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets;
|
||||
MergedColumnOnlyOutputStream out(
|
||||
new_data_part,
|
||||
@ -2279,7 +2287,7 @@ void MergeTreeDataMergerMutator::mutateSomePartColumns(
|
||||
void MergeTreeDataMergerMutator::finalizeMutatedPart(
|
||||
const MergeTreeDataPartPtr & source_part,
|
||||
MergeTreeData::MutableDataPartPtr new_data_part,
|
||||
bool need_remove_expired_values,
|
||||
ExecuteTTLType execute_ttl_type,
|
||||
const CompressionCodecPtr & codec)
|
||||
{
|
||||
auto disk = new_data_part->volume->getDisk();
|
||||
@ -2293,7 +2301,7 @@ void MergeTreeDataMergerMutator::finalizeMutatedPart(
|
||||
new_data_part->checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_hash = out_hashing.getHash();
|
||||
}
|
||||
|
||||
if (need_remove_expired_values)
|
||||
if (execute_ttl_type != ExecuteTTLType::NONE)
|
||||
{
|
||||
/// Write a file with ttl infos in json format.
|
||||
auto out_ttl = disk->writeFile(fs::path(new_data_part->getFullRelativePath()) / "ttl.txt", 4096);
|
||||
|
@ -23,6 +23,13 @@ enum class SelectPartsDecision
|
||||
NOTHING_TO_MERGE = 2,
|
||||
};
|
||||
|
||||
enum class ExecuteTTLType
|
||||
{
|
||||
NONE = 0,
|
||||
NORMAL = 1,
|
||||
RECALCULATE= 2,
|
||||
};
|
||||
|
||||
/// Auxiliary struct holding metainformation for the future merged or mutated part.
|
||||
struct FutureMergedMutatedPart
|
||||
{
|
||||
@ -200,8 +207,7 @@ private:
|
||||
const ProjectionsDescription & all_projections,
|
||||
const MutationCommands & commands_for_removes);
|
||||
|
||||
static bool shouldExecuteTTL(
|
||||
const StorageMetadataPtr & metadata_snapshot, const ColumnDependencies & dependencies, const MutationCommands & commands);
|
||||
static ExecuteTTLType shouldExecuteTTL(const StorageMetadataPtr & metadata_snapshot, const ColumnDependencies & dependencies);
|
||||
|
||||
/// Return set of indices which should be recalculated during mutation also
|
||||
/// wraps input stream into additional expression stream
|
||||
@ -242,7 +248,7 @@ private:
|
||||
time_t time_of_mutation,
|
||||
const CompressionCodecPtr & compression_codec,
|
||||
MergeListEntry & merge_entry,
|
||||
bool need_remove_expired_values,
|
||||
ExecuteTTLType execute_ttl_type,
|
||||
bool need_sync,
|
||||
const ReservationPtr & space_reservation,
|
||||
TableLockHolder & holder,
|
||||
@ -260,7 +266,7 @@ private:
|
||||
time_t time_of_mutation,
|
||||
const CompressionCodecPtr & compression_codec,
|
||||
MergeListEntry & merge_entry,
|
||||
bool need_remove_expired_values,
|
||||
ExecuteTTLType execute_ttl_type,
|
||||
bool need_sync,
|
||||
const ReservationPtr & space_reservation,
|
||||
TableLockHolder & holder,
|
||||
@ -271,7 +277,7 @@ private:
|
||||
static void finalizeMutatedPart(
|
||||
const MergeTreeDataPartPtr & source_part,
|
||||
MergeTreeData::MutableDataPartPtr new_data_part,
|
||||
bool need_remove_expired_values,
|
||||
ExecuteTTLType execute_ttl_type,
|
||||
const CompressionCodecPtr & codec);
|
||||
|
||||
public :
|
||||
|
@ -117,6 +117,7 @@ struct Settings;
|
||||
M(Int64, merge_with_ttl_timeout, 3600 * 4, "Minimal time in seconds, when merge with delete TTL can be repeated.", 0) \
|
||||
M(Int64, merge_with_recompression_ttl_timeout, 3600 * 4, "Minimal time in seconds, when merge with recompression TTL can be repeated.", 0) \
|
||||
M(Bool, ttl_only_drop_parts, false, "Only drop altogether the expired parts and not partially prune them.", 0) \
|
||||
M(Bool, materialize_ttl_recalculate_only, false, "Only recalculate ttl info when MATERIALIZE TTL", 0) \
|
||||
M(Bool, write_final_mark, true, "Write final mark after end of column (0 - disabled, do nothing if index_granularity_bytes=0)", 0) \
|
||||
M(Bool, enable_mixed_granularity_parts, true, "Enable parts with adaptive and non adaptive granularity", 0) \
|
||||
M(MaxThreads, max_part_loading_threads, 0, "The number of threads to load data parts at startup.", 0) \
|
||||
|
@ -73,6 +73,11 @@ public:
|
||||
return storage.getPartitionIDFromQuery(ast, context);
|
||||
}
|
||||
|
||||
bool materializeTTLRecalculateOnly() const
|
||||
{
|
||||
return parts.front()->storage.getSettings()->materialize_ttl_recalculate_only;
|
||||
}
|
||||
|
||||
protected:
|
||||
/// Used in part mutation.
|
||||
StorageFromMergeTreeDataPart(const MergeTreeData::DataPartPtr & part_)
|
||||
|
@ -214,7 +214,7 @@ bool StorageInMemoryMetadata::hasAnyGroupByTTL() const
|
||||
return !table_ttl.group_by_ttl.empty();
|
||||
}
|
||||
|
||||
ColumnDependencies StorageInMemoryMetadata::getColumnDependencies(const NameSet & updated_columns) const
|
||||
ColumnDependencies StorageInMemoryMetadata::getColumnDependencies(const NameSet & updated_columns, bool include_ttl_target) const
|
||||
{
|
||||
if (updated_columns.empty())
|
||||
return {};
|
||||
@ -250,7 +250,7 @@ ColumnDependencies StorageInMemoryMetadata::getColumnDependencies(const NameSet
|
||||
if (hasRowsTTL())
|
||||
{
|
||||
auto rows_expression = getRowsTTL().expression;
|
||||
if (add_dependent_columns(rows_expression, required_ttl_columns))
|
||||
if (add_dependent_columns(rows_expression, required_ttl_columns) && include_ttl_target)
|
||||
{
|
||||
/// Filter all columns, if rows TTL expression have to be recalculated.
|
||||
for (const auto & column : getColumns().getAllPhysical())
|
||||
@ -263,13 +263,15 @@ ColumnDependencies StorageInMemoryMetadata::getColumnDependencies(const NameSet
|
||||
|
||||
for (const auto & [name, entry] : getColumnTTLs())
|
||||
{
|
||||
if (add_dependent_columns(entry.expression, required_ttl_columns))
|
||||
if (add_dependent_columns(entry.expression, required_ttl_columns) && include_ttl_target)
|
||||
updated_ttl_columns.insert(name);
|
||||
}
|
||||
|
||||
for (const auto & entry : getMoveTTLs())
|
||||
add_dependent_columns(entry.expression, required_ttl_columns);
|
||||
|
||||
//TODO what about rows_where_ttl and group_by_ttl ??
|
||||
|
||||
for (const auto & column : indices_columns)
|
||||
res.emplace(column, ColumnDependency::SKIP_INDEX);
|
||||
for (const auto & column : projections_columns)
|
||||
|
@ -143,7 +143,7 @@ struct StorageInMemoryMetadata
|
||||
|
||||
/// Returns columns, which will be needed to calculate dependencies (skip
|
||||
/// indices, TTL expressions) if we update @updated_columns set of columns.
|
||||
ColumnDependencies getColumnDependencies(const NameSet & updated_columns) const;
|
||||
ColumnDependencies getColumnDependencies(const NameSet & updated_columns, bool include_ttl_target) const;
|
||||
|
||||
/// Block with ordinary + materialized columns.
|
||||
Block getSampleBlock() const;
|
||||
|
@ -0,0 +1,68 @@
|
||||
2000-10-10 1
|
||||
2000-10-10 2
|
||||
2100-10-10 3
|
||||
2100-10-10 4
|
||||
2000-10-11 00:00:00 2000-10-11 00:00:00
|
||||
2000-10-11 00:00:00 2000-10-11 00:00:00
|
||||
2100-10-11 00:00:00 2100-10-11 00:00:00
|
||||
2100-10-11 00:00:00 2100-10-11 00:00:00
|
||||
2100-10-10 3
|
||||
2100-10-10 4
|
||||
=============
|
||||
1 a
|
||||
2 b
|
||||
3 c
|
||||
4 d
|
||||
2000-01-01 00:00:00 2100-01-01 00:00:00
|
||||
1 a
|
||||
3 c
|
||||
=============
|
||||
1 a
|
||||
3 c
|
||||
2000-01-01 00:00:00 2000-01-01 00:00:00
|
||||
=============
|
||||
1 a
|
||||
2 b
|
||||
3 c
|
||||
4 d
|
||||
1 a
|
||||
2
|
||||
3 c
|
||||
4
|
||||
=============
|
||||
1 a
|
||||
2
|
||||
3 c
|
||||
4
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
=============
|
||||
1 a
|
||||
2 b
|
||||
3 c
|
||||
4 d
|
||||
2000-01-01 00:00:00 2100-01-01 00:00:00
|
||||
1 a
|
||||
2 b
|
||||
4 d
|
||||
=============
|
||||
1 a
|
||||
2 b
|
||||
4 d
|
||||
1
|
||||
2
|
||||
4 d
|
||||
=============
|
||||
1 a aa
|
||||
2 b bb
|
||||
3 c cc
|
||||
4 d dd
|
||||
1 a
|
||||
2 b bb
|
||||
3 cc
|
||||
4 d
|
||||
1
|
||||
=============
|
||||
0
|
107
tests/queries/0_stateless/01070_modify_ttl_recalc_only.sql
Normal file
107
tests/queries/0_stateless/01070_modify_ttl_recalc_only.sql
Normal file
@ -0,0 +1,107 @@
|
||||
set mutations_sync = 2;
|
||||
|
||||
drop table if exists ttl;
|
||||
|
||||
create table ttl (d Date, a Int) engine = MergeTree order by a partition by toDayOfMonth(d)
|
||||
SETTINGS max_number_of_merges_with_ttl_in_pool=0,materialize_ttl_recalculate_only=true;
|
||||
|
||||
insert into ttl values (toDateTime('2000-10-10 00:00:00'), 1);
|
||||
insert into ttl values (toDateTime('2000-10-10 00:00:00'), 2);
|
||||
insert into ttl values (toDateTime('2100-10-10 00:00:00'), 3);
|
||||
insert into ttl values (toDateTime('2100-10-10 00:00:00'), 4);
|
||||
|
||||
|
||||
alter table ttl modify ttl d + interval 1 day;
|
||||
select * from ttl order by a;
|
||||
select delete_ttl_info_min, delete_ttl_info_max from system.parts where database = currentDatabase() and table = 'ttl' and active > 0 order by name asc;
|
||||
optimize table ttl final;
|
||||
select * from ttl order by a;
|
||||
select '=============';
|
||||
|
||||
drop table if exists ttl;
|
||||
|
||||
create table ttl (i Int, s String) engine = MergeTree order by i
|
||||
SETTINGS max_number_of_merges_with_ttl_in_pool=0,materialize_ttl_recalculate_only=true;
|
||||
|
||||
insert into ttl values (1, 'a') (2, 'b') (3, 'c') (4, 'd');
|
||||
|
||||
alter table ttl modify ttl i % 2 = 0 ? toDate('2000-01-01') : toDate('2100-01-01');
|
||||
select * from ttl order by i;
|
||||
select delete_ttl_info_min, delete_ttl_info_max from system.parts where database = currentDatabase() and table = 'ttl' and active > 0;
|
||||
optimize table ttl final;
|
||||
select * from ttl order by i;
|
||||
select '=============';
|
||||
|
||||
alter table ttl modify ttl toDate('2000-01-01');
|
||||
select * from ttl order by i;
|
||||
select delete_ttl_info_min, delete_ttl_info_max from system.parts where database = currentDatabase() and table = 'ttl' and active > 0;
|
||||
optimize table ttl final;
|
||||
select * from ttl order by i;
|
||||
select '=============';
|
||||
|
||||
drop table if exists ttl;
|
||||
|
||||
create table ttl (i Int, s String) engine = MergeTree order by i
|
||||
SETTINGS max_number_of_merges_with_ttl_in_pool=0,materialize_ttl_recalculate_only=true;
|
||||
|
||||
insert into ttl values (1, 'a') (2, 'b') (3, 'c') (4, 'd');
|
||||
|
||||
alter table ttl modify column s String ttl i % 2 = 0 ? today() - 10 : toDate('2100-01-01');
|
||||
select * from ttl order by i;
|
||||
optimize table ttl final;
|
||||
select * from ttl order by i;
|
||||
select '=============';
|
||||
|
||||
alter table ttl modify column s String ttl toDate('2000-01-01');
|
||||
select * from ttl order by i;
|
||||
optimize table ttl final;
|
||||
select * from ttl order by i;
|
||||
select '=============';
|
||||
|
||||
drop table if exists ttl;
|
||||
|
||||
create table ttl (d Date, i Int, s String) engine = MergeTree order by i
|
||||
SETTINGS max_number_of_merges_with_ttl_in_pool=0,materialize_ttl_recalculate_only=true;
|
||||
|
||||
insert into ttl values (toDate('2000-01-02'), 1, 'a') (toDate('2000-01-03'), 2, 'b') (toDate('2080-01-01'), 3, 'c') (toDate('2080-01-03'), 4, 'd');
|
||||
|
||||
alter table ttl modify ttl i % 3 = 0 ? toDate('2000-01-01') : toDate('2100-01-01');
|
||||
select i, s from ttl order by i;
|
||||
select delete_ttl_info_min, delete_ttl_info_max from system.parts where database = currentDatabase() and table = 'ttl' and active > 0;
|
||||
optimize table ttl final;
|
||||
select i, s from ttl order by i;
|
||||
select '=============';
|
||||
|
||||
alter table ttl modify column s String ttl d + interval 1 month;
|
||||
select i, s from ttl order by i;
|
||||
optimize table ttl final;
|
||||
select i, s from ttl order by i;
|
||||
select '=============';
|
||||
|
||||
drop table if exists ttl;
|
||||
|
||||
create table ttl (i Int, s String, t String) engine = MergeTree order by i
|
||||
SETTINGS max_number_of_merges_with_ttl_in_pool=0,materialize_ttl_recalculate_only=true;
|
||||
|
||||
insert into ttl values (1, 'a', 'aa') (2, 'b', 'bb') (3, 'c', 'cc') (4, 'd', 'dd');
|
||||
|
||||
alter table ttl modify column s String ttl i % 3 = 0 ? today() - 10 : toDate('2100-01-01'),
|
||||
modify column t String ttl i % 3 = 1 ? today() - 10 : toDate('2100-01-01');
|
||||
|
||||
select i, s, t from ttl order by i;
|
||||
optimize table ttl final;
|
||||
select i, s, t from ttl order by i;
|
||||
-- MATERIALIZE TTL ran only once
|
||||
select count() from system.mutations where database = currentDatabase() and table = 'ttl' and is_done;
|
||||
select '=============';
|
||||
|
||||
drop table if exists ttl;
|
||||
|
||||
-- Nothing changed, don't run mutation
|
||||
create table ttl (i Int, s String ttl toDate('2000-01-02')) engine = MergeTree order by i
|
||||
SETTINGS max_number_of_merges_with_ttl_in_pool=0,materialize_ttl_recalculate_only=true;
|
||||
|
||||
alter table ttl modify column s String ttl toDate('2000-01-02');
|
||||
select count() from system.mutations where database = currentDatabase() and table = 'ttl' and is_done;
|
||||
|
||||
drop table if exists ttl;
|
Loading…
Reference in New Issue
Block a user