mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Merge pull request #57430 from ClickHouse/non-ready-set-ttl
Non ready set in TTL WHERE.
This commit is contained in:
commit
862c6cd79c
@ -77,6 +77,7 @@ public:
|
||||
const DataTypes & getElementsTypes() const { return set_elements_types; }
|
||||
|
||||
bool hasExplicitSetElements() const { return fill_set_elements || (!set_elements.empty() && set_elements.front()->size() == data.getTotalRowCount()); }
|
||||
bool hasSetElements() const { return !set_elements.empty(); }
|
||||
Columns getSetElements() const { checkIsCreated(); return { set_elements.begin(), set_elements.end() }; }
|
||||
|
||||
void checkColumnsNumber(size_t num_key_columns) const;
|
||||
|
@ -157,6 +157,34 @@ void addCreatingSetsStep(QueryPlan & query_plan, PreparedSets::Subqueries subque
|
||||
query_plan.unitePlans(std::move(creating_sets), std::move(plans));
|
||||
}
|
||||
|
||||
QueryPipelineBuilderPtr addCreatingSetsTransform(QueryPipelineBuilderPtr pipeline, PreparedSets::Subqueries subqueries, ContextPtr context)
|
||||
{
|
||||
DataStreams input_streams;
|
||||
input_streams.emplace_back(DataStream{pipeline->getHeader()});
|
||||
|
||||
QueryPipelineBuilders pipelines;
|
||||
pipelines.reserve(1 + subqueries.size());
|
||||
pipelines.push_back(std::move(pipeline));
|
||||
|
||||
auto plan_settings = QueryPlanOptimizationSettings::fromContext(context);
|
||||
auto pipeline_settings = BuildQueryPipelineSettings::fromContext(context);
|
||||
|
||||
for (auto & future_set : subqueries)
|
||||
{
|
||||
if (future_set->get())
|
||||
continue;
|
||||
|
||||
auto plan = future_set->build(context);
|
||||
if (!plan)
|
||||
continue;
|
||||
|
||||
input_streams.emplace_back(plan->getCurrentDataStream());
|
||||
pipelines.emplace_back(plan->buildQueryPipeline(plan_settings, pipeline_settings));
|
||||
}
|
||||
|
||||
return CreatingSetsStep(input_streams).updatePipeline(std::move(pipelines), pipeline_settings);
|
||||
}
|
||||
|
||||
std::vector<std::unique_ptr<QueryPlan>> DelayedCreatingSetsStep::makePlansForSets(DelayedCreatingSetsStep && step)
|
||||
{
|
||||
std::vector<std::unique_ptr<QueryPlan>> plans;
|
||||
|
@ -72,4 +72,6 @@ void addCreatingSetsStep(QueryPlan & query_plan, PreparedSets::Subqueries subque
|
||||
|
||||
void addCreatingSetsStep(QueryPlan & query_plan, PreparedSetsPtr prepared_sets, ContextPtr context);
|
||||
|
||||
QueryPipelineBuilderPtr addCreatingSetsTransform(QueryPipelineBuilderPtr pipeline, PreparedSets::Subqueries subqueries, ContextPtr context);
|
||||
|
||||
}
|
||||
|
@ -11,8 +11,9 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
ITTLAlgorithm::ITTLAlgorithm(
|
||||
const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_)
|
||||
: description(description_)
|
||||
const TTLExpressions & ttl_expressions_, const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_)
|
||||
: ttl_expressions(ttl_expressions_)
|
||||
, description(description_)
|
||||
, old_ttl_info(old_ttl_info_)
|
||||
, current_time(current_time_)
|
||||
, force(force_)
|
||||
|
@ -8,6 +8,12 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct TTLExpressions
|
||||
{
|
||||
ExpressionActionsPtr expression;
|
||||
ExpressionActionsPtr where_expression;
|
||||
};
|
||||
|
||||
/**
|
||||
* Represents the actions, which are required to do
|
||||
* with data, when TTL is expired: delete, aggregate, etc.
|
||||
@ -18,7 +24,7 @@ public:
|
||||
using TTLInfo = IMergeTreeDataPart::TTLInfo;
|
||||
using MutableDataPartPtr = MergeTreeMutableDataPartPtr;
|
||||
|
||||
ITTLAlgorithm(const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_);
|
||||
ITTLAlgorithm(const TTLExpressions & ttl_expressions_, const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_);
|
||||
virtual ~ITTLAlgorithm() = default;
|
||||
|
||||
virtual void execute(Block & block) = 0;
|
||||
@ -39,6 +45,7 @@ protected:
|
||||
bool isTTLExpired(time_t ttl) const;
|
||||
UInt32 getTimestampByIndex(const IColumn * column, size_t index) const;
|
||||
|
||||
const TTLExpressions ttl_expressions;
|
||||
const TTLDescription description;
|
||||
const TTLInfo old_ttl_info;
|
||||
const time_t current_time;
|
||||
|
@ -5,13 +5,14 @@ namespace DB
|
||||
{
|
||||
|
||||
TTLAggregationAlgorithm::TTLAggregationAlgorithm(
|
||||
const TTLExpressions & ttl_expressions_,
|
||||
const TTLDescription & description_,
|
||||
const TTLInfo & old_ttl_info_,
|
||||
time_t current_time_,
|
||||
bool force_,
|
||||
const Block & header_,
|
||||
const MergeTreeData & storage_)
|
||||
: ITTLAlgorithm(description_, old_ttl_info_, current_time_, force_)
|
||||
: ITTLAlgorithm(ttl_expressions_, description_, old_ttl_info_, current_time_, force_)
|
||||
, header(header_)
|
||||
{
|
||||
current_key_value.resize(description.group_by_keys.size());
|
||||
@ -75,8 +76,8 @@ void TTLAggregationAlgorithm::execute(Block & block)
|
||||
const auto & column_names = header.getNames();
|
||||
MutableColumns aggregate_columns = header.cloneEmptyColumns();
|
||||
|
||||
auto ttl_column = executeExpressionAndGetColumn(description.expression, block, description.result_column);
|
||||
auto where_column = executeExpressionAndGetColumn(description.where_expression, block, description.where_result_column);
|
||||
auto ttl_column = executeExpressionAndGetColumn(ttl_expressions.expression, block, description.result_column);
|
||||
auto where_column = executeExpressionAndGetColumn(ttl_expressions.where_expression, block, description.where_result_column);
|
||||
|
||||
size_t rows_aggregated = 0;
|
||||
size_t current_key_start = 0;
|
||||
@ -157,8 +158,8 @@ void TTLAggregationAlgorithm::execute(Block & block)
|
||||
/// If some rows were aggregated we have to recalculate ttl info's
|
||||
if (some_rows_were_aggregated)
|
||||
{
|
||||
auto ttl_column_after_aggregation = executeExpressionAndGetColumn(description.expression, block, description.result_column);
|
||||
auto where_column_after_aggregation = executeExpressionAndGetColumn(description.where_expression, block, description.where_result_column);
|
||||
auto ttl_column_after_aggregation = executeExpressionAndGetColumn(ttl_expressions.expression, block, description.result_column);
|
||||
auto where_column_after_aggregation = executeExpressionAndGetColumn(ttl_expressions.where_expression, block, description.where_result_column);
|
||||
for (size_t i = 0; i < block.rows(); ++i)
|
||||
{
|
||||
bool where_filter_passed = !where_column_after_aggregation || where_column_after_aggregation->getBool(i);
|
||||
|
@ -13,6 +13,7 @@ class TTLAggregationAlgorithm final : public ITTLAlgorithm
|
||||
{
|
||||
public:
|
||||
TTLAggregationAlgorithm(
|
||||
const TTLExpressions & ttl_expressions_,
|
||||
const TTLDescription & description_,
|
||||
const TTLInfo & old_ttl_info_,
|
||||
time_t current_time_,
|
||||
|
@ -4,6 +4,7 @@ namespace DB
|
||||
{
|
||||
|
||||
TTLColumnAlgorithm::TTLColumnAlgorithm(
|
||||
const TTLExpressions & ttl_expressions_,
|
||||
const TTLDescription & description_,
|
||||
const TTLInfo & old_ttl_info_,
|
||||
time_t current_time_,
|
||||
@ -12,7 +13,7 @@ TTLColumnAlgorithm::TTLColumnAlgorithm(
|
||||
const ExpressionActionsPtr & default_expression_,
|
||||
const String & default_column_name_,
|
||||
bool is_compact_part_)
|
||||
: ITTLAlgorithm(description_, old_ttl_info_, current_time_, force_)
|
||||
: ITTLAlgorithm(ttl_expressions_, description_, old_ttl_info_, current_time_, force_)
|
||||
, column_name(column_name_)
|
||||
, default_expression(default_expression_)
|
||||
, default_column_name(default_column_name_)
|
||||
@ -49,7 +50,7 @@ void TTLColumnAlgorithm::execute(Block & block)
|
||||
if (default_column)
|
||||
default_column = default_column->convertToFullColumnIfConst();
|
||||
|
||||
auto ttl_column = executeExpressionAndGetColumn(description.expression, block, description.result_column);
|
||||
auto ttl_column = executeExpressionAndGetColumn(ttl_expressions.expression, block, description.result_column);
|
||||
|
||||
auto & column_with_type = block.getByName(column_name);
|
||||
const IColumn * values_column = column_with_type.column.get();
|
||||
|
@ -11,6 +11,7 @@ class TTLColumnAlgorithm final : public ITTLAlgorithm
|
||||
{
|
||||
public:
|
||||
TTLColumnAlgorithm(
|
||||
const TTLExpressions & ttl_expressions_,
|
||||
const TTLDescription & description_,
|
||||
const TTLInfo & old_ttl_info_,
|
||||
time_t current_time_,
|
||||
|
@ -4,8 +4,8 @@ namespace DB
|
||||
{
|
||||
|
||||
TTLDeleteAlgorithm::TTLDeleteAlgorithm(
|
||||
const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_)
|
||||
: ITTLAlgorithm(description_, old_ttl_info_, current_time_, force_)
|
||||
const TTLExpressions & ttl_expressions_, const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_)
|
||||
: ITTLAlgorithm(ttl_expressions_, description_, old_ttl_info_, current_time_, force_)
|
||||
{
|
||||
if (!isMinTTLExpired())
|
||||
new_ttl_info = old_ttl_info;
|
||||
@ -19,8 +19,8 @@ void TTLDeleteAlgorithm::execute(Block & block)
|
||||
if (!block || !isMinTTLExpired())
|
||||
return;
|
||||
|
||||
auto ttl_column = executeExpressionAndGetColumn(description.expression, block, description.result_column);
|
||||
auto where_column = executeExpressionAndGetColumn(description.where_expression, block, description.where_result_column);
|
||||
auto ttl_column = executeExpressionAndGetColumn(ttl_expressions.expression, block, description.result_column);
|
||||
auto where_column = executeExpressionAndGetColumn(ttl_expressions.where_expression, block, description.where_result_column);
|
||||
|
||||
MutableColumns result_columns;
|
||||
const auto & column_names = block.getNames();
|
||||
@ -54,7 +54,7 @@ void TTLDeleteAlgorithm::execute(Block & block)
|
||||
|
||||
void TTLDeleteAlgorithm::finalize(const MutableDataPartPtr & data_part) const
|
||||
{
|
||||
if (description.where_expression)
|
||||
if (ttl_expressions.where_expression)
|
||||
data_part->ttl_infos.rows_where_ttl[description.result_column] = new_ttl_info;
|
||||
else
|
||||
data_part->ttl_infos.table_ttl = new_ttl_info;
|
||||
|
@ -10,7 +10,7 @@ namespace DB
|
||||
class TTLDeleteAlgorithm final : public ITTLAlgorithm
|
||||
{
|
||||
public:
|
||||
TTLDeleteAlgorithm(const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_);
|
||||
TTLDeleteAlgorithm(const TTLExpressions & ttl_expressions_, const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_);
|
||||
|
||||
void execute(Block & block) override;
|
||||
void finalize(const MutableDataPartPtr & data_part) const override;
|
||||
|
@ -4,13 +4,14 @@ namespace DB
|
||||
{
|
||||
|
||||
TTLUpdateInfoAlgorithm::TTLUpdateInfoAlgorithm(
|
||||
const TTLExpressions & ttl_expressions_,
|
||||
const TTLDescription & description_,
|
||||
const TTLUpdateField ttl_update_field_,
|
||||
const String ttl_update_key_,
|
||||
const TTLInfo & old_ttl_info_,
|
||||
time_t current_time_,
|
||||
bool force_)
|
||||
: ITTLAlgorithm(description_, old_ttl_info_, current_time_, force_)
|
||||
: ITTLAlgorithm(ttl_expressions_, description_, old_ttl_info_, current_time_, force_)
|
||||
, ttl_update_field(ttl_update_field_)
|
||||
, ttl_update_key(ttl_update_key_)
|
||||
{
|
||||
@ -21,7 +22,7 @@ void TTLUpdateInfoAlgorithm::execute(Block & block)
|
||||
if (!block)
|
||||
return;
|
||||
|
||||
auto ttl_column = executeExpressionAndGetColumn(description.expression, block, description.result_column);
|
||||
auto ttl_column = executeExpressionAndGetColumn(ttl_expressions.expression, block, description.result_column);
|
||||
for (size_t i = 0; i < block.rows(); ++i)
|
||||
{
|
||||
UInt32 cur_ttl = ITTLAlgorithm::getTimestampByIndex(ttl_column.get(), i);
|
||||
|
@ -20,6 +20,7 @@ class TTLUpdateInfoAlgorithm : public ITTLAlgorithm
|
||||
{
|
||||
public:
|
||||
TTLUpdateInfoAlgorithm(
|
||||
const TTLExpressions & ttl_expressions_,
|
||||
const TTLDescription & description_,
|
||||
const TTLUpdateField ttl_update_field_,
|
||||
const String ttl_update_key_,
|
||||
|
@ -4,7 +4,24 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
static TTLExpressions getExpressions(const TTLDescription & ttl_descr, PreparedSets::Subqueries & subqueries_for_sets, const ContextPtr & context)
|
||||
{
|
||||
auto expr = ttl_descr.buildExpression(context);
|
||||
auto expr_queries = expr.sets->getSubqueries();
|
||||
subqueries_for_sets.insert(subqueries_for_sets.end(), expr_queries.begin(), expr_queries.end());
|
||||
|
||||
auto where_expr = ttl_descr.buildWhereExpression(context);
|
||||
if (where_expr.sets)
|
||||
{
|
||||
auto where_expr_queries = where_expr.sets->getSubqueries();
|
||||
subqueries_for_sets.insert(subqueries_for_sets.end(), where_expr_queries.begin(), where_expr_queries.end());
|
||||
}
|
||||
|
||||
return {expr.expression, where_expr.expression};
|
||||
}
|
||||
|
||||
TTLCalcTransform::TTLCalcTransform(
|
||||
const ContextPtr & context,
|
||||
const Block & header_,
|
||||
const MergeTreeData & storage_,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
@ -21,33 +38,39 @@ TTLCalcTransform::TTLCalcTransform(
|
||||
{
|
||||
const auto & rows_ttl = metadata_snapshot_->getRowsTTL();
|
||||
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
|
||||
rows_ttl, TTLUpdateField::TABLE_TTL, rows_ttl.result_column, old_ttl_infos.table_ttl, current_time_, force_));
|
||||
getExpressions(rows_ttl, subqueries_for_sets, context), rows_ttl,
|
||||
TTLUpdateField::TABLE_TTL, rows_ttl.result_column, old_ttl_infos.table_ttl, current_time_, force_));
|
||||
}
|
||||
|
||||
for (const auto & where_ttl : metadata_snapshot_->getRowsWhereTTLs())
|
||||
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
|
||||
where_ttl, TTLUpdateField::ROWS_WHERE_TTL, where_ttl.result_column, old_ttl_infos.rows_where_ttl[where_ttl.result_column], current_time_, force_));
|
||||
getExpressions(where_ttl, subqueries_for_sets, context), where_ttl,
|
||||
TTLUpdateField::ROWS_WHERE_TTL, where_ttl.result_column, old_ttl_infos.rows_where_ttl[where_ttl.result_column], current_time_, force_));
|
||||
|
||||
for (const auto & group_by_ttl : metadata_snapshot_->getGroupByTTLs())
|
||||
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
|
||||
group_by_ttl, TTLUpdateField::GROUP_BY_TTL, group_by_ttl.result_column, old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_));
|
||||
getExpressions(group_by_ttl, subqueries_for_sets, context), group_by_ttl,
|
||||
TTLUpdateField::GROUP_BY_TTL, group_by_ttl.result_column, old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_));
|
||||
|
||||
if (metadata_snapshot_->hasAnyColumnTTL())
|
||||
{
|
||||
for (const auto & [name, description] : metadata_snapshot_->getColumnTTLs())
|
||||
{
|
||||
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
|
||||
description, TTLUpdateField::COLUMNS_TTL, name, old_ttl_infos.columns_ttl[name], current_time_, force_));
|
||||
getExpressions(description, subqueries_for_sets, context), description,
|
||||
TTLUpdateField::COLUMNS_TTL, name, old_ttl_infos.columns_ttl[name], current_time_, force_));
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto & move_ttl : metadata_snapshot_->getMoveTTLs())
|
||||
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
|
||||
move_ttl, TTLUpdateField::MOVES_TTL, move_ttl.result_column, old_ttl_infos.moves_ttl[move_ttl.result_column], current_time_, force_));
|
||||
getExpressions(move_ttl, subqueries_for_sets, context), move_ttl,
|
||||
TTLUpdateField::MOVES_TTL, move_ttl.result_column, old_ttl_infos.moves_ttl[move_ttl.result_column], current_time_, force_));
|
||||
|
||||
for (const auto & recompression_ttl : metadata_snapshot_->getRecompressionTTLs())
|
||||
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
|
||||
recompression_ttl, TTLUpdateField::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_));
|
||||
getExpressions(recompression_ttl, subqueries_for_sets, context), recompression_ttl,
|
||||
TTLUpdateField::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_));
|
||||
}
|
||||
|
||||
void TTLCalcTransform::consume(Chunk chunk)
|
||||
|
@ -15,6 +15,7 @@ class TTLCalcTransform : public IAccumulatingTransform
|
||||
{
|
||||
public:
|
||||
TTLCalcTransform(
|
||||
const ContextPtr & context,
|
||||
const Block & header_,
|
||||
const MergeTreeData & storage_,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
@ -23,6 +24,8 @@ public:
|
||||
bool force_
|
||||
);
|
||||
|
||||
PreparedSets::Subqueries getSubqueries() { return std::move(subqueries_for_sets); }
|
||||
|
||||
String getName() const override { return "TTL_CALC"; }
|
||||
Status prepare() override;
|
||||
|
||||
@ -35,6 +38,7 @@ protected:
|
||||
|
||||
private:
|
||||
std::vector<TTLAlgorithmPtr> algorithms;
|
||||
PreparedSets::Subqueries subqueries_for_sets;
|
||||
|
||||
/// ttl_infos and empty_columns are updating while reading
|
||||
const MergeTreeData::MutableDataPartPtr & data_part;
|
||||
|
@ -16,7 +16,24 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
static TTLExpressions getExpressions(const TTLDescription & ttl_descr, PreparedSets::Subqueries & subqueries_for_sets, const ContextPtr & context)
|
||||
{
|
||||
auto expr = ttl_descr.buildExpression(context);
|
||||
auto expr_queries = expr.sets->getSubqueries();
|
||||
subqueries_for_sets.insert(subqueries_for_sets.end(), expr_queries.begin(), expr_queries.end());
|
||||
|
||||
auto where_expr = ttl_descr.buildWhereExpression(context);
|
||||
if (where_expr.sets)
|
||||
{
|
||||
auto where_expr_queries = where_expr.sets->getSubqueries();
|
||||
subqueries_for_sets.insert(subqueries_for_sets.end(), where_expr_queries.begin(), where_expr_queries.end());
|
||||
}
|
||||
|
||||
return {expr.expression, where_expr.expression};
|
||||
}
|
||||
|
||||
TTLTransform::TTLTransform(
|
||||
const ContextPtr & context,
|
||||
const Block & header_,
|
||||
const MergeTreeData & storage_,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
@ -33,10 +50,11 @@ TTLTransform::TTLTransform(
|
||||
{
|
||||
const auto & rows_ttl = metadata_snapshot_->getRowsTTL();
|
||||
auto algorithm = std::make_unique<TTLDeleteAlgorithm>(
|
||||
rows_ttl, old_ttl_infos.table_ttl, current_time_, force_);
|
||||
getExpressions(rows_ttl, subqueries_for_sets, context), rows_ttl,
|
||||
old_ttl_infos.table_ttl, current_time_, force_);
|
||||
|
||||
/// Skip all data if table ttl is expired for part
|
||||
if (algorithm->isMaxTTLExpired() && !rows_ttl.where_expression)
|
||||
if (algorithm->isMaxTTLExpired() && !rows_ttl.where_expression_ast)
|
||||
all_data_dropped = true;
|
||||
|
||||
delete_algorithm = algorithm.get();
|
||||
@ -45,11 +63,13 @@ TTLTransform::TTLTransform(
|
||||
|
||||
for (const auto & where_ttl : metadata_snapshot_->getRowsWhereTTLs())
|
||||
algorithms.emplace_back(std::make_unique<TTLDeleteAlgorithm>(
|
||||
where_ttl, old_ttl_infos.rows_where_ttl[where_ttl.result_column], current_time_, force_));
|
||||
getExpressions(where_ttl, subqueries_for_sets, context), where_ttl,
|
||||
old_ttl_infos.rows_where_ttl[where_ttl.result_column], current_time_, force_));
|
||||
|
||||
for (const auto & group_by_ttl : metadata_snapshot_->getGroupByTTLs())
|
||||
algorithms.emplace_back(std::make_unique<TTLAggregationAlgorithm>(
|
||||
group_by_ttl, old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_,
|
||||
getExpressions(group_by_ttl, subqueries_for_sets, context), group_by_ttl,
|
||||
old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_,
|
||||
getInputPort().getHeader(), storage_));
|
||||
|
||||
if (metadata_snapshot_->hasAnyColumnTTL())
|
||||
@ -75,18 +95,21 @@ TTLTransform::TTLTransform(
|
||||
}
|
||||
|
||||
algorithms.emplace_back(std::make_unique<TTLColumnAlgorithm>(
|
||||
description, old_ttl_infos.columns_ttl[name], current_time_,
|
||||
getExpressions(description, subqueries_for_sets, context), description,
|
||||
old_ttl_infos.columns_ttl[name], current_time_,
|
||||
force_, name, default_expression, default_column_name, isCompactPart(data_part)));
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto & move_ttl : metadata_snapshot_->getMoveTTLs())
|
||||
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
|
||||
move_ttl, TTLUpdateField::MOVES_TTL, move_ttl.result_column, old_ttl_infos.moves_ttl[move_ttl.result_column], current_time_, force_));
|
||||
getExpressions(move_ttl, subqueries_for_sets, context), move_ttl,
|
||||
TTLUpdateField::MOVES_TTL, move_ttl.result_column, old_ttl_infos.moves_ttl[move_ttl.result_column], current_time_, force_));
|
||||
|
||||
for (const auto & recompression_ttl : metadata_snapshot_->getRecompressionTTLs())
|
||||
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
|
||||
recompression_ttl, TTLUpdateField::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_));
|
||||
getExpressions(recompression_ttl, subqueries_for_sets, context), recompression_ttl,
|
||||
TTLUpdateField::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_));
|
||||
}
|
||||
|
||||
Block reorderColumns(Block block, const Block & header)
|
||||
|
@ -16,6 +16,7 @@ class TTLTransform : public IAccumulatingTransform
|
||||
{
|
||||
public:
|
||||
TTLTransform(
|
||||
const ContextPtr & context,
|
||||
const Block & header_,
|
||||
const MergeTreeData & storage_,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
@ -28,6 +29,8 @@ public:
|
||||
|
||||
Status prepare() override;
|
||||
|
||||
PreparedSets::Subqueries getSubqueries() { return std::move(subqueries_for_sets); }
|
||||
|
||||
protected:
|
||||
void consume(Chunk chunk) override;
|
||||
Chunk generate() override;
|
||||
@ -40,6 +43,8 @@ private:
|
||||
const TTLDeleteAlgorithm * delete_algorithm = nullptr;
|
||||
bool all_data_dropped = false;
|
||||
|
||||
PreparedSets::Subqueries subqueries_for_sets;
|
||||
|
||||
/// ttl_infos and empty_columns are updating while reading
|
||||
const MergeTreeData::MutableDataPartPtr & data_part;
|
||||
LoggerPtr log;
|
||||
|
@ -33,6 +33,9 @@
|
||||
#include <Processors/Transforms/TTLCalcTransform.h>
|
||||
#include <Processors/Transforms/DistinctSortedTransform.h>
|
||||
#include <Processors/Transforms/DistinctTransform.h>
|
||||
#include <Processors/QueryPlan/CreatingSetsStep.h>
|
||||
#include <Interpreters/PreparedSets.h>
|
||||
#include <QueryPipeline/QueryPipelineBuilder.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -1056,13 +1059,14 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
|
||||
break;
|
||||
}
|
||||
|
||||
auto res_pipe = Pipe::unitePipes(std::move(pipes));
|
||||
res_pipe.addTransform(std::move(merged_transform));
|
||||
auto builder = std::make_unique<QueryPipelineBuilder>();
|
||||
builder->init(Pipe::unitePipes(std::move(pipes)));
|
||||
builder->addTransform(std::move(merged_transform));
|
||||
|
||||
#ifndef NDEBUG
|
||||
if (!sort_description.empty())
|
||||
{
|
||||
res_pipe.addSimpleTransform([&](const Block & header_)
|
||||
builder->addSimpleTransform([&](const Block & header_)
|
||||
{
|
||||
auto transform = std::make_shared<CheckSortedTransform>(header_, sort_description);
|
||||
return transform;
|
||||
@ -1084,26 +1088,34 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
|
||||
}
|
||||
|
||||
if (DistinctSortedTransform::isApplicable(header, sort_description, global_ctx->deduplicate_by_columns))
|
||||
res_pipe.addTransform(std::make_shared<DistinctSortedTransform>(
|
||||
res_pipe.getHeader(), sort_description, SizeLimits(), 0 /*limit_hint*/, global_ctx->deduplicate_by_columns));
|
||||
builder->addTransform(std::make_shared<DistinctSortedTransform>(
|
||||
builder->getHeader(), sort_description, SizeLimits(), 0 /*limit_hint*/, global_ctx->deduplicate_by_columns));
|
||||
else
|
||||
res_pipe.addTransform(std::make_shared<DistinctTransform>(
|
||||
res_pipe.getHeader(), SizeLimits(), 0 /*limit_hint*/, global_ctx->deduplicate_by_columns));
|
||||
builder->addTransform(std::make_shared<DistinctTransform>(
|
||||
builder->getHeader(), SizeLimits(), 0 /*limit_hint*/, global_ctx->deduplicate_by_columns));
|
||||
}
|
||||
|
||||
PreparedSets::Subqueries subqueries;
|
||||
|
||||
if (ctx->need_remove_expired_values)
|
||||
res_pipe.addTransform(std::make_shared<TTLTransform>(
|
||||
res_pipe.getHeader(), *global_ctx->data, global_ctx->metadata_snapshot, global_ctx->new_data_part, global_ctx->time_of_merge, ctx->force_ttl));
|
||||
{
|
||||
auto transform = std::make_shared<TTLTransform>(global_ctx->context, builder->getHeader(), *global_ctx->data, global_ctx->metadata_snapshot, global_ctx->new_data_part, global_ctx->time_of_merge, ctx->force_ttl);
|
||||
subqueries = transform->getSubqueries();
|
||||
builder->addTransform(std::move(transform));
|
||||
}
|
||||
|
||||
if (global_ctx->metadata_snapshot->hasSecondaryIndices())
|
||||
{
|
||||
const auto & indices = global_ctx->metadata_snapshot->getSecondaryIndices();
|
||||
res_pipe.addTransform(std::make_shared<ExpressionTransform>(
|
||||
res_pipe.getHeader(), indices.getSingleExpressionForIndices(global_ctx->metadata_snapshot->getColumns(), global_ctx->data->getContext())));
|
||||
res_pipe.addTransform(std::make_shared<MaterializingTransform>(res_pipe.getHeader()));
|
||||
builder->addTransform(std::make_shared<ExpressionTransform>(
|
||||
builder->getHeader(), indices.getSingleExpressionForIndices(global_ctx->metadata_snapshot->getColumns(), global_ctx->data->getContext())));
|
||||
builder->addTransform(std::make_shared<MaterializingTransform>(builder->getHeader()));
|
||||
}
|
||||
|
||||
global_ctx->merged_pipeline = QueryPipeline(std::move(res_pipe));
|
||||
if (!subqueries.empty())
|
||||
builder = addCreatingSetsTransform(std::move(builder), std::move(subqueries), global_ctx->context);
|
||||
|
||||
global_ctx->merged_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
|
||||
/// Dereference unique_ptr and pass horizontal_stage_progress by reference
|
||||
global_ctx->merged_pipeline.setProgressCallback(MergeProgressCallback(global_ctx->merge_list_element_ptr, global_ctx->watch_prev_elapsed, *global_ctx->horizontal_stage_progress));
|
||||
/// Is calculated inside MergeProgressCallback.
|
||||
|
@ -132,13 +132,18 @@ void buildScatterSelector(
|
||||
|
||||
/// Computes ttls and updates ttl infos
|
||||
void updateTTL(
|
||||
const ContextPtr context,
|
||||
const TTLDescription & ttl_entry,
|
||||
IMergeTreeDataPart::TTLInfos & ttl_infos,
|
||||
DB::MergeTreeDataPartTTLInfo & ttl_info,
|
||||
const Block & block,
|
||||
bool update_part_min_max_ttls)
|
||||
{
|
||||
auto ttl_column = ITTLAlgorithm::executeExpressionAndGetColumn(ttl_entry.expression, block, ttl_entry.result_column);
|
||||
auto expr_and_set = ttl_entry.buildExpression(context);
|
||||
for (auto & subquery : expr_and_set.sets->getSubqueries())
|
||||
subquery->buildSetInplace(context);
|
||||
|
||||
auto ttl_column = ITTLAlgorithm::executeExpressionAndGetColumn(expr_and_set.expression, block, ttl_entry.result_column);
|
||||
|
||||
if (const ColumnUInt16 * column_date = typeid_cast<const ColumnUInt16 *>(ttl_column.get()))
|
||||
{
|
||||
@ -507,7 +512,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
|
||||
DB::IMergeTreeDataPart::TTLInfos move_ttl_infos;
|
||||
const auto & move_ttl_entries = metadata_snapshot->getMoveTTLs();
|
||||
for (const auto & ttl_entry : move_ttl_entries)
|
||||
updateTTL(ttl_entry, move_ttl_infos, move_ttl_infos.moves_ttl[ttl_entry.result_column], block, false);
|
||||
updateTTL(context, ttl_entry, move_ttl_infos, move_ttl_infos.moves_ttl[ttl_entry.result_column], block, false);
|
||||
|
||||
ReservationPtr reservation = data.reserveSpacePreferringTTLRules(metadata_snapshot, expected_size, move_ttl_infos, time(nullptr), 0, true);
|
||||
VolumePtr volume = data.getStoragePolicy()->getVolume(0);
|
||||
@ -562,20 +567,20 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
|
||||
}
|
||||
|
||||
if (metadata_snapshot->hasRowsTTL())
|
||||
updateTTL(metadata_snapshot->getRowsTTL(), new_data_part->ttl_infos, new_data_part->ttl_infos.table_ttl, block, true);
|
||||
updateTTL(context, metadata_snapshot->getRowsTTL(), new_data_part->ttl_infos, new_data_part->ttl_infos.table_ttl, block, true);
|
||||
|
||||
for (const auto & ttl_entry : metadata_snapshot->getGroupByTTLs())
|
||||
updateTTL(ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.group_by_ttl[ttl_entry.result_column], block, true);
|
||||
updateTTL(context, ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.group_by_ttl[ttl_entry.result_column], block, true);
|
||||
|
||||
for (const auto & ttl_entry : metadata_snapshot->getRowsWhereTTLs())
|
||||
updateTTL(ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.rows_where_ttl[ttl_entry.result_column], block, true);
|
||||
updateTTL(context, ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.rows_where_ttl[ttl_entry.result_column], block, true);
|
||||
|
||||
for (const auto & [name, ttl_entry] : metadata_snapshot->getColumnTTLs())
|
||||
updateTTL(ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.columns_ttl[name], block, true);
|
||||
updateTTL(context, ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.columns_ttl[name], block, true);
|
||||
|
||||
const auto & recompression_ttl_entries = metadata_snapshot->getRecompressionTTLs();
|
||||
for (const auto & ttl_entry : recompression_ttl_entries)
|
||||
updateTTL(ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.recompression_ttl[ttl_entry.result_column], block, false);
|
||||
updateTTL(context, ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.recompression_ttl[ttl_entry.result_column], block, false);
|
||||
|
||||
new_data_part->ttl_infos.update(move_ttl_infos);
|
||||
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <Parsers/queryToString.h>
|
||||
#include <Interpreters/SquashingTransform.h>
|
||||
#include <Interpreters/MergeTreeTransaction.h>
|
||||
#include <Interpreters/PreparedSets.h>
|
||||
#include <Processors/Transforms/TTLTransform.h>
|
||||
#include <Processors/Transforms/TTLCalcTransform.h>
|
||||
#include <Processors/Transforms/DistinctSortedTransform.h>
|
||||
@ -16,6 +17,7 @@
|
||||
#include <Processors/Transforms/ExpressionTransform.h>
|
||||
#include <Processors/Transforms/MaterializingTransform.h>
|
||||
#include <Processors/Executors/PullingPipelineExecutor.h>
|
||||
#include <Processors/QueryPlan/CreatingSetsStep.h>
|
||||
#include <Storages/MergeTree/StorageFromMergeTreeDataPart.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataWriter.h>
|
||||
#include <Storages/MutationCommands.h>
|
||||
@ -1552,21 +1554,34 @@ private:
|
||||
if (!ctx->mutating_pipeline_builder.initialized())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot mutate part columns with uninitialized mutations stream. It's a bug");
|
||||
|
||||
QueryPipelineBuilder builder(std::move(ctx->mutating_pipeline_builder));
|
||||
auto builder = std::make_unique<QueryPipelineBuilder>(std::move(ctx->mutating_pipeline_builder));
|
||||
|
||||
if (ctx->metadata_snapshot->hasPrimaryKey() || ctx->metadata_snapshot->hasSecondaryIndices())
|
||||
{
|
||||
builder.addTransform(std::make_shared<ExpressionTransform>(
|
||||
builder.getHeader(), ctx->data->getPrimaryKeyAndSkipIndicesExpression(ctx->metadata_snapshot, skip_indices)));
|
||||
builder->addTransform(std::make_shared<ExpressionTransform>(
|
||||
builder->getHeader(), ctx->data->getPrimaryKeyAndSkipIndicesExpression(ctx->metadata_snapshot, skip_indices)));
|
||||
|
||||
builder.addTransform(std::make_shared<MaterializingTransform>(builder.getHeader()));
|
||||
builder->addTransform(std::make_shared<MaterializingTransform>(builder->getHeader()));
|
||||
}
|
||||
|
||||
PreparedSets::Subqueries subqueries;
|
||||
|
||||
if (ctx->execute_ttl_type == ExecuteTTLType::NORMAL)
|
||||
builder.addTransform(std::make_shared<TTLTransform>(builder.getHeader(), *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true));
|
||||
{
|
||||
auto transform = std::make_shared<TTLTransform>(ctx->context, builder->getHeader(), *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true);
|
||||
subqueries = transform->getSubqueries();
|
||||
builder->addTransform(std::move(transform));
|
||||
}
|
||||
|
||||
if (ctx->execute_ttl_type == ExecuteTTLType::RECALCULATE)
|
||||
builder.addTransform(std::make_shared<TTLCalcTransform>(builder.getHeader(), *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true));
|
||||
{
|
||||
auto transform = std::make_shared<TTLCalcTransform>(ctx->context, builder->getHeader(), *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true);
|
||||
subqueries = transform->getSubqueries();
|
||||
builder->addTransform(std::move(transform));
|
||||
}
|
||||
|
||||
if (!subqueries.empty())
|
||||
builder = addCreatingSetsTransform(std::move(builder), std::move(subqueries), ctx->context);
|
||||
|
||||
ctx->minmax_idx = std::make_shared<IMergeTreeDataPart::MinMaxIndex>();
|
||||
|
||||
@ -1600,7 +1615,7 @@ private:
|
||||
ctx->context->getWriteSettings(),
|
||||
computed_granularity);
|
||||
|
||||
ctx->mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
|
||||
ctx->mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
|
||||
ctx->mutating_pipeline.setProgressCallback(ctx->progress_callback);
|
||||
/// Is calculated inside MergeProgressCallback.
|
||||
ctx->mutating_pipeline.disableProfileEventUpdate();
|
||||
@ -1796,13 +1811,25 @@ private:
|
||||
|
||||
if (ctx->mutating_pipeline_builder.initialized())
|
||||
{
|
||||
QueryPipelineBuilder builder(std::move(ctx->mutating_pipeline_builder));
|
||||
auto builder = std::make_unique<QueryPipelineBuilder>(std::move(ctx->mutating_pipeline_builder));
|
||||
PreparedSets::Subqueries subqueries;
|
||||
|
||||
if (ctx->execute_ttl_type == ExecuteTTLType::NORMAL)
|
||||
builder.addTransform(std::make_shared<TTLTransform>(builder.getHeader(), *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true));
|
||||
{
|
||||
auto transform = std::make_shared<TTLTransform>(ctx->context, builder->getHeader(), *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true);
|
||||
subqueries = transform->getSubqueries();
|
||||
builder->addTransform(std::move(transform));
|
||||
}
|
||||
|
||||
if (ctx->execute_ttl_type == ExecuteTTLType::RECALCULATE)
|
||||
builder.addTransform(std::make_shared<TTLCalcTransform>(builder.getHeader(), *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true));
|
||||
{
|
||||
auto transform = std::make_shared<TTLCalcTransform>(ctx->context, builder->getHeader(), *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true);
|
||||
subqueries = transform->getSubqueries();
|
||||
builder->addTransform(std::move(transform));
|
||||
}
|
||||
|
||||
if (!subqueries.empty())
|
||||
builder = addCreatingSetsTransform(std::move(builder), std::move(subqueries), ctx->context);
|
||||
|
||||
ctx->out = std::make_shared<MergedColumnOnlyOutputStream>(
|
||||
ctx->new_data_part,
|
||||
@ -1816,7 +1843,7 @@ private:
|
||||
&ctx->source_part->index_granularity_info
|
||||
);
|
||||
|
||||
ctx->mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
|
||||
ctx->mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
|
||||
ctx->mutating_pipeline.setProgressCallback(ctx->progress_callback);
|
||||
/// Is calculated inside MergeProgressCallback.
|
||||
ctx->mutating_pipeline.disableProfileEventUpdate();
|
||||
|
@ -200,7 +200,7 @@ TTLDescription StorageInMemoryMetadata::getRowsTTL() const
|
||||
|
||||
bool StorageInMemoryMetadata::hasRowsTTL() const
|
||||
{
|
||||
return table_ttl.rows_ttl.expression != nullptr;
|
||||
return table_ttl.rows_ttl.expression_ast != nullptr;
|
||||
}
|
||||
|
||||
TTLDescriptions StorageInMemoryMetadata::getRowsWhereTTLs() const
|
||||
@ -258,9 +258,8 @@ ColumnDependencies StorageInMemoryMetadata::getColumnDependencies(
|
||||
NameSet required_ttl_columns;
|
||||
NameSet updated_ttl_columns;
|
||||
|
||||
auto add_dependent_columns = [&updated_columns](const auto & expression, auto & to_set)
|
||||
auto add_dependent_columns = [&updated_columns](const Names & required_columns, auto & to_set)
|
||||
{
|
||||
auto required_columns = expression->getRequiredColumns();
|
||||
for (const auto & dependency : required_columns)
|
||||
{
|
||||
if (updated_columns.contains(dependency))
|
||||
@ -276,18 +275,18 @@ ColumnDependencies StorageInMemoryMetadata::getColumnDependencies(
|
||||
for (const auto & index : getSecondaryIndices())
|
||||
{
|
||||
if (has_dependency(index.name, ColumnDependency::SKIP_INDEX))
|
||||
add_dependent_columns(index.expression, indices_columns);
|
||||
add_dependent_columns(index.expression->getRequiredColumns(), indices_columns);
|
||||
}
|
||||
|
||||
for (const auto & projection : getProjections())
|
||||
{
|
||||
if (has_dependency(projection.name, ColumnDependency::PROJECTION))
|
||||
add_dependent_columns(&projection, projections_columns);
|
||||
add_dependent_columns(projection.getRequiredColumns(), projections_columns);
|
||||
}
|
||||
|
||||
auto add_for_rows_ttl = [&](const auto & expression, auto & to_set)
|
||||
{
|
||||
if (add_dependent_columns(expression, to_set) && include_ttl_target)
|
||||
if (add_dependent_columns(expression.getNames(), to_set) && include_ttl_target)
|
||||
{
|
||||
/// Filter all columns, if rows TTL expression have to be recalculated.
|
||||
for (const auto & column : getColumns().getAllPhysical())
|
||||
@ -296,25 +295,25 @@ ColumnDependencies StorageInMemoryMetadata::getColumnDependencies(
|
||||
};
|
||||
|
||||
if (hasRowsTTL())
|
||||
add_for_rows_ttl(getRowsTTL().expression, required_ttl_columns);
|
||||
add_for_rows_ttl(getRowsTTL().expression_columns, required_ttl_columns);
|
||||
|
||||
for (const auto & entry : getRowsWhereTTLs())
|
||||
add_for_rows_ttl(entry.expression, required_ttl_columns);
|
||||
add_for_rows_ttl(entry.expression_columns, required_ttl_columns);
|
||||
|
||||
for (const auto & entry : getGroupByTTLs())
|
||||
add_for_rows_ttl(entry.expression, required_ttl_columns);
|
||||
add_for_rows_ttl(entry.expression_columns, required_ttl_columns);
|
||||
|
||||
for (const auto & entry : getRecompressionTTLs())
|
||||
add_dependent_columns(entry.expression, required_ttl_columns);
|
||||
add_dependent_columns(entry.expression_columns.getNames(), required_ttl_columns);
|
||||
|
||||
for (const auto & [name, entry] : getColumnTTLs())
|
||||
{
|
||||
if (add_dependent_columns(entry.expression, required_ttl_columns) && include_ttl_target)
|
||||
if (add_dependent_columns(entry.expression_columns.getNames(), required_ttl_columns) && include_ttl_target)
|
||||
updated_ttl_columns.insert(name);
|
||||
}
|
||||
|
||||
for (const auto & entry : getMoveTTLs())
|
||||
add_dependent_columns(entry.expression, required_ttl_columns);
|
||||
add_dependent_columns(entry.expression_columns.getNames(), required_ttl_columns);
|
||||
|
||||
//TODO what about rows_where_ttl and group_by_ttl ??
|
||||
|
||||
|
@ -18,6 +18,7 @@
|
||||
#include <Interpreters/FunctionNameNormalizer.h>
|
||||
#include <Parsers/ExpressionListParsers.h>
|
||||
#include <Parsers/parseQuery.h>
|
||||
#include <Parsers/queryToString.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -110,7 +111,10 @@ using FindAggregateFunctionVisitor = InDepthNodeVisitor<FindAggregateFunctionFin
|
||||
TTLDescription::TTLDescription(const TTLDescription & other)
|
||||
: mode(other.mode)
|
||||
, expression_ast(other.expression_ast ? other.expression_ast->clone() : nullptr)
|
||||
, expression_columns(other.expression_columns)
|
||||
, result_column(other.result_column)
|
||||
, where_expression_ast(other.where_expression_ast ? other.where_expression_ast->clone() : nullptr)
|
||||
, where_expression_columns(other.where_expression_columns)
|
||||
, where_result_column(other.where_result_column)
|
||||
, group_by_keys(other.group_by_keys)
|
||||
, set_parts(other.set_parts)
|
||||
@ -120,11 +124,6 @@ TTLDescription::TTLDescription(const TTLDescription & other)
|
||||
, if_exists(other.if_exists)
|
||||
, recompression_codec(other.recompression_codec)
|
||||
{
|
||||
if (other.expression)
|
||||
expression = other.expression->clone();
|
||||
|
||||
if (other.where_expression)
|
||||
where_expression = other.where_expression->clone();
|
||||
}
|
||||
|
||||
TTLDescription & TTLDescription::operator=(const TTLDescription & other)
|
||||
@ -138,17 +137,15 @@ TTLDescription & TTLDescription::operator=(const TTLDescription & other)
|
||||
else
|
||||
expression_ast.reset();
|
||||
|
||||
if (other.expression)
|
||||
expression = other.expression->clone();
|
||||
else
|
||||
expression.reset();
|
||||
|
||||
expression_columns = other.expression_columns;
|
||||
result_column = other.result_column;
|
||||
if (other.where_expression)
|
||||
where_expression = other.where_expression->clone();
|
||||
else
|
||||
where_expression.reset();
|
||||
|
||||
if (other.where_expression_ast)
|
||||
where_expression_ast = other.where_expression_ast->clone();
|
||||
else
|
||||
where_expression_ast.reset();
|
||||
|
||||
where_expression_columns = other.where_expression_columns;
|
||||
where_result_column = other.where_result_column;
|
||||
group_by_keys = other.group_by_keys;
|
||||
set_parts = other.set_parts;
|
||||
@ -165,6 +162,44 @@ TTLDescription & TTLDescription::operator=(const TTLDescription & other)
|
||||
return * this;
|
||||
}
|
||||
|
||||
static ExpressionAndSets buildExpressionAndSets(ASTPtr & ast, const NamesAndTypesList & columns, const ContextPtr & context)
|
||||
{
|
||||
ExpressionAndSets result;
|
||||
auto ttl_string = queryToString(ast);
|
||||
auto syntax_analyzer_result = TreeRewriter(context).analyze(ast, columns);
|
||||
ExpressionAnalyzer analyzer(ast, syntax_analyzer_result, context);
|
||||
auto dag = analyzer.getActionsDAG(false);
|
||||
|
||||
const auto * col = &dag->findInOutputs(ast->getColumnName());
|
||||
if (col->result_name != ttl_string)
|
||||
col = &dag->addAlias(*col, ttl_string);
|
||||
|
||||
dag->getOutputs() = {col};
|
||||
dag->removeUnusedActions();
|
||||
|
||||
result.expression = std::make_shared<ExpressionActions>(dag, ExpressionActionsSettings::fromContext(context));
|
||||
result.sets = analyzer.getPreparedSets();
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
ExpressionAndSets TTLDescription::buildExpression(const ContextPtr & context) const
|
||||
{
|
||||
auto ast = expression_ast->clone();
|
||||
return buildExpressionAndSets(ast, expression_columns, context);
|
||||
}
|
||||
|
||||
ExpressionAndSets TTLDescription::buildWhereExpression(const ContextPtr & context) const
|
||||
{
|
||||
if (where_expression_ast)
|
||||
{
|
||||
auto ast = where_expression_ast->clone();
|
||||
return buildExpressionAndSets(ast, where_expression_columns, context);
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
TTLDescription TTLDescription::getTTLFromAST(
|
||||
const ASTPtr & definition_ast,
|
||||
const ColumnsDescription & columns,
|
||||
@ -182,9 +217,12 @@ TTLDescription TTLDescription::getTTLFromAST(
|
||||
result.expression_ast = definition_ast->clone();
|
||||
|
||||
auto ttl_ast = result.expression_ast->clone();
|
||||
auto syntax_analyzer_result = TreeRewriter(context).analyze(ttl_ast, columns.getAllPhysical());
|
||||
result.expression = ExpressionAnalyzer(ttl_ast, syntax_analyzer_result, context).getActions(false);
|
||||
result.result_column = ttl_ast->getColumnName();
|
||||
auto expression = buildExpressionAndSets(ttl_ast, columns.getAllPhysical(), context).expression;
|
||||
result.expression_columns = expression->getRequiredColumnsWithTypes();
|
||||
|
||||
result.result_column = expression->getSampleBlock().safeGetByPosition(0).name;
|
||||
|
||||
ExpressionActionsPtr where_expression;
|
||||
|
||||
if (ttl_element == nullptr) /// columns TTL
|
||||
{
|
||||
@ -202,9 +240,10 @@ TTLDescription TTLDescription::getTTLFromAST(
|
||||
{
|
||||
if (ASTPtr where_expr_ast = ttl_element->where())
|
||||
{
|
||||
auto where_syntax_result = TreeRewriter(context).analyze(where_expr_ast, columns.getAllPhysical());
|
||||
result.where_expression = ExpressionAnalyzer(where_expr_ast, where_syntax_result, context).getActions(false);
|
||||
result.where_result_column = where_expr_ast->getColumnName();
|
||||
result.where_expression_ast = where_expr_ast->clone();
|
||||
where_expression = buildExpressionAndSets(where_expr_ast, columns.getAllPhysical(), context).expression;
|
||||
result.where_expression_columns = where_expression->getRequiredColumnsWithTypes();
|
||||
result.where_result_column = where_expression->getSampleBlock().safeGetByPosition(0).name;
|
||||
}
|
||||
}
|
||||
else if (ttl_element->mode == TTLMode::GROUP_BY)
|
||||
@ -229,17 +268,17 @@ TTLDescription TTLDescription::getTTLFromAST(
|
||||
for (const auto & ast : ttl_element->group_by_assignments)
|
||||
{
|
||||
const auto assignment = ast->as<const ASTAssignment &>();
|
||||
auto expression = assignment.expression();
|
||||
auto ass_expression = assignment.expression();
|
||||
|
||||
FindAggregateFunctionVisitor::Data data{false};
|
||||
FindAggregateFunctionVisitor(data).visit(expression);
|
||||
FindAggregateFunctionVisitor(data).visit(ass_expression);
|
||||
|
||||
if (!data.has_aggregate_function)
|
||||
throw Exception(ErrorCodes::BAD_TTL_EXPRESSION,
|
||||
"Invalid expression for assignment of column {}. Should contain an aggregate function", assignment.column_name);
|
||||
|
||||
expression = addTypeConversionToAST(std::move(expression), columns.getPhysical(assignment.column_name).type->getName());
|
||||
aggregations.emplace_back(assignment.column_name, std::move(expression));
|
||||
ass_expression = addTypeConversionToAST(std::move(ass_expression), columns.getPhysical(assignment.column_name).type->getName());
|
||||
aggregations.emplace_back(assignment.column_name, std::move(ass_expression));
|
||||
aggregation_columns_set.insert(assignment.column_name);
|
||||
}
|
||||
|
||||
@ -297,7 +336,7 @@ TTLDescription TTLDescription::getTTLFromAST(
|
||||
}
|
||||
}
|
||||
|
||||
checkTTLExpression(result.expression, result.result_column, is_attach || context->getSettingsRef().allow_suspicious_ttl_expressions);
|
||||
checkTTLExpression(expression, result.result_column, is_attach || context->getSettingsRef().allow_suspicious_ttl_expressions);
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -350,7 +389,7 @@ TTLTableDescription TTLTableDescription::getTTLForTableFromAST(
|
||||
auto ttl = TTLDescription::getTTLFromAST(ttl_element_ptr, columns, context, primary_key, is_attach);
|
||||
if (ttl.mode == TTLMode::DELETE)
|
||||
{
|
||||
if (!ttl.where_expression)
|
||||
if (!ttl.where_expression_ast)
|
||||
{
|
||||
if (have_unconditional_delete_ttl)
|
||||
throw Exception(ErrorCodes::BAD_TTL_EXPRESSION, "More than one DELETE TTL expression without WHERE expression is not allowed");
|
||||
|
@ -35,6 +35,15 @@ struct TTLAggregateDescription
|
||||
|
||||
using TTLAggregateDescriptions = std::vector<TTLAggregateDescription>;
|
||||
|
||||
class PreparedSets;
|
||||
using PreparedSetsPtr = std::shared_ptr<PreparedSets>;
|
||||
|
||||
struct ExpressionAndSets
|
||||
{
|
||||
ExpressionActionsPtr expression;
|
||||
PreparedSetsPtr sets;
|
||||
};
|
||||
|
||||
/// Common struct for TTL record in storage
|
||||
struct TTLDescription
|
||||
{
|
||||
@ -44,9 +53,10 @@ struct TTLDescription
|
||||
/// TTL d + INTERVAL 1 DAY
|
||||
/// ^~~~~~~~~~~~~~~~~~~^
|
||||
ASTPtr expression_ast;
|
||||
NamesAndTypesList expression_columns;
|
||||
|
||||
/// Expression actions evaluated from AST
|
||||
ExpressionActionsPtr expression;
|
||||
ExpressionAndSets buildExpression(const ContextPtr & context) const;
|
||||
|
||||
/// Result column of this TTL expression
|
||||
String result_column;
|
||||
@ -54,7 +64,9 @@ struct TTLDescription
|
||||
/// WHERE part in TTL expression
|
||||
/// TTL ... WHERE x % 10 == 0 and y > 5
|
||||
/// ^~~~~~~~~~~~~~~~~~~~~~^
|
||||
ExpressionActionsPtr where_expression;
|
||||
ASTPtr where_expression_ast;
|
||||
NamesAndTypesList where_expression_columns;
|
||||
ExpressionAndSets buildWhereExpression(const ContextPtr & context) const;
|
||||
|
||||
/// Name of result column from WHERE expression
|
||||
String where_result_column;
|
||||
|
@ -155,7 +155,7 @@ def test_recompression_multiple_ttls(started_cluster):
|
||||
node2.query(
|
||||
"SELECT recompression_ttl_info.expression FROM system.parts where name = 'all_1_1_4'"
|
||||
)
|
||||
== "['plus(d, toIntervalSecond(10))','plus(d, toIntervalSecond(15))','plus(d, toIntervalSecond(5))']\n"
|
||||
== "['d + toIntervalSecond(10)','d + toIntervalSecond(15)','d + toIntervalSecond(5)']\n"
|
||||
)
|
||||
|
||||
|
||||
|
@ -13,9 +13,9 @@ CREATE TABLE default.recompression_table\n(\n `dt` DateTime,\n `key` UInt6
|
||||
1_1_1 LZ4
|
||||
2_2_2 ZSTD(12)
|
||||
3_3_3 ZSTD(12)
|
||||
1_1_1 ['plus(dt, toIntervalDay(1))']
|
||||
2_2_2 ['plus(dt, toIntervalDay(1))']
|
||||
3_3_3 ['plus(dt, toIntervalDay(1))']
|
||||
1_1_1 ['dt + toIntervalDay(1)']
|
||||
2_2_2 ['dt + toIntervalDay(1)']
|
||||
3_3_3 ['dt + toIntervalDay(1)']
|
||||
1_1_1 LZ4
|
||||
2_2_2 LZ4
|
||||
3_3_3 LZ4
|
||||
|
3
tests/queries/0_stateless/02932_set_ttl_where.reference
Normal file
3
tests/queries/0_stateless/02932_set_ttl_where.reference
Normal file
@ -0,0 +1,3 @@
|
||||
0
|
||||
0
|
||||
0
|
18
tests/queries/0_stateless/02932_set_ttl_where.sql
Normal file
18
tests/queries/0_stateless/02932_set_ttl_where.sql
Normal file
@ -0,0 +1,18 @@
|
||||
-- Tags: no-ordinary-database
|
||||
|
||||
create or replace table t_temp (
|
||||
a UInt32,
|
||||
timestamp DateTime
|
||||
)
|
||||
engine = MergeTree
|
||||
order by a
|
||||
TTL timestamp + INTERVAL 2 SECOND WHERE a in (select number from system.numbers limit 100_000);
|
||||
|
||||
select sleep(1);
|
||||
insert into t_temp select rand(), now() from system.numbers limit 1_000_000;
|
||||
select sleep(1);
|
||||
insert into t_temp select rand(), now() from system.numbers limit 1_000_000;
|
||||
select sleep(1);
|
||||
optimize table t_temp final;
|
||||
|
||||
DROP TABLE t_temp;
|
Loading…
Reference in New Issue
Block a user