Merge pull request #15450 from CurtizJ/fix-ttl-group-by

Fix some cases of TTL expressions
This commit is contained in:
alesapin 2021-02-01 16:48:07 +03:00 committed by GitHub
commit 2aa8a6304b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 1218 additions and 624 deletions

View File

@ -0,0 +1,65 @@
#include <DataStreams/ITTLAlgorithm.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnConst.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
ITTLAlgorithm::ITTLAlgorithm(
const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_)
: description(description_)
, old_ttl_info(old_ttl_info_)
, current_time(current_time_)
, force(force_)
, date_lut(DateLUT::instance())
{
}
bool ITTLAlgorithm::isTTLExpired(time_t ttl) const
{
return (ttl && (ttl <= current_time));
}
ColumnPtr ITTLAlgorithm::executeExpressionAndGetColumn(
const ExpressionActionsPtr & expression, const Block & block, const String & result_column)
{
if (!expression)
return nullptr;
if (block.has(result_column))
return block.getByName(result_column).column;
Block block_copy;
for (const auto & column_name : expression->getRequiredColumns())
block_copy.insert(block.getByName(column_name));
/// Keep number of rows for const expression.
size_t num_rows = block.rows();
expression->execute(block_copy, num_rows);
return block_copy.getByName(result_column).column;
}
UInt32 ITTLAlgorithm::getTimestampByIndex(const IColumn * column, size_t index) const
{
if (const ColumnUInt16 * column_date = typeid_cast<const ColumnUInt16 *>(column))
return date_lut.fromDayNum(DayNum(column_date->getData()[index]));
else if (const ColumnUInt32 * column_date_time = typeid_cast<const ColumnUInt32 *>(column))
return column_date_time->getData()[index];
else if (const ColumnConst * column_const = typeid_cast<const ColumnConst *>(column))
{
if (typeid_cast<const ColumnUInt16 *>(&column_const->getDataColumn()))
return date_lut.fromDayNum(DayNum(column_const->getValue<UInt16>()));
else if (typeid_cast<const ColumnUInt32 *>(&column_const->getDataColumn()))
return column_const->getValue<UInt32>();
}
throw Exception("Unexpected type of result TTL column", ErrorCodes::LOGICAL_ERROR);
}
}

View File

@ -0,0 +1,54 @@
#pragma once
#include <Storages/TTLDescription.h>
#include <Storages/MergeTree/MergeTreeDataPartTTLInfo.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <common/DateLUT.h>
namespace DB
{
/**
* Represents the actions, which are required to do
* with data, when TTL is expired: delete, aggregate, etc.
*/
class ITTLAlgorithm
{
public:
using TTLInfo = IMergeTreeDataPart::TTLInfo;
using MutableDataPartPtr = MergeTreeMutableDataPartPtr;
ITTLAlgorithm(const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_);
virtual ~ITTLAlgorithm() = default;
virtual void execute(Block & block) = 0;
/// Updates TTL metadata of the data_part.
virtual void finalize(const MutableDataPartPtr & data_part) const = 0;
bool isMinTTLExpired() const { return force || isTTLExpired(old_ttl_info.min); }
bool isMaxTTLExpired() const { return isTTLExpired(old_ttl_info.max); }
/** This function is needed to avoid a conflict between already calculated columns and columns that needed to execute TTL.
* If result column is absent in block, all required columns are copied to new block and expression is executed on new block.
*/
static ColumnPtr executeExpressionAndGetColumn(
const ExpressionActionsPtr & expression, const Block & block, const String & result_column);
protected:
bool isTTLExpired(time_t ttl) const;
UInt32 getTimestampByIndex(const IColumn * column, size_t index) const;
const TTLDescription description;
const TTLInfo old_ttl_info;
const time_t current_time;
const bool force;
TTLInfo new_ttl_info;
private:
const DateLUTImpl & date_lut;
};
using TTLAlgorithmPtr = std::unique_ptr<ITTLAlgorithm>;
}

View File

@ -0,0 +1,173 @@
#include <DataStreams/TTLAggregationAlgorithm.h>
namespace DB
{
TTLAggregationAlgorithm::TTLAggregationAlgorithm(
const TTLDescription & description_,
const TTLInfo & old_ttl_info_,
time_t current_time_,
bool force_,
const Block & header_,
const MergeTreeData & storage_)
: ITTLAlgorithm(description_, old_ttl_info_, current_time_, force_)
, header(header_)
{
current_key_value.resize(description.group_by_keys.size());
ColumnNumbers keys;
for (const auto & key : description.group_by_keys)
keys.push_back(header.getPositionByName(key));
key_columns.resize(description.group_by_keys.size());
AggregateDescriptions aggregates = description.aggregate_descriptions;
for (auto & descr : aggregates)
if (descr.arguments.empty())
for (const auto & name : descr.argument_names)
descr.arguments.push_back(header.getPositionByName(name));
columns_for_aggregator.resize(description.aggregate_descriptions.size());
const Settings & settings = storage_.global_context.getSettingsRef();
Aggregator::Params params(header, keys, aggregates,
false, settings.max_rows_to_group_by, settings.group_by_overflow_mode, 0, 0,
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
storage_.global_context.getTemporaryVolume(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
aggregator = std::make_unique<Aggregator>(params);
}
void TTLAggregationAlgorithm::execute(Block & block)
{
if (!block)
{
if (!aggregation_result.empty())
{
MutableColumns result_columns = header.cloneEmptyColumns();
finalizeAggregates(result_columns);
block = header.cloneWithColumns(std::move(result_columns));
}
return;
}
const auto & column_names = header.getNames();
MutableColumns result_columns = header.cloneEmptyColumns();
MutableColumns aggregate_columns = header.cloneEmptyColumns();
auto ttl_column = executeExpressionAndGetColumn(description.expression, block, description.result_column);
auto where_column = executeExpressionAndGetColumn(description.where_expression, block, description.where_result_column);
size_t rows_aggregated = 0;
size_t current_key_start = 0;
size_t rows_with_current_key = 0;
for (size_t i = 0; i < block.rows(); ++i)
{
UInt32 cur_ttl = getTimestampByIndex(ttl_column.get(), i);
bool where_filter_passed = !where_column || where_column->getBool(i);
bool ttl_expired = isTTLExpired(cur_ttl) && where_filter_passed;
bool same_as_current = true;
for (size_t j = 0; j < description.group_by_keys.size(); ++j)
{
const String & key_column = description.group_by_keys[j];
const IColumn * values_column = block.getByName(key_column).column.get();
if (!same_as_current || (*values_column)[i] != current_key_value[j])
{
values_column->get(i, current_key_value[j]);
same_as_current = false;
}
}
if (!same_as_current)
{
if (rows_with_current_key)
calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key);
finalizeAggregates(result_columns);
current_key_start = rows_aggregated;
rows_with_current_key = 0;
}
if (ttl_expired)
{
++rows_with_current_key;
++rows_aggregated;
for (const auto & name : column_names)
{
const IColumn * values_column = block.getByName(name).column.get();
auto & column = aggregate_columns[header.getPositionByName(name)];
column->insertFrom(*values_column, i);
}
}
else
{
new_ttl_info.update(cur_ttl);
for (const auto & name : column_names)
{
const IColumn * values_column = block.getByName(name).column.get();
auto & column = result_columns[header.getPositionByName(name)];
column->insertFrom(*values_column, i);
}
}
}
if (rows_with_current_key)
calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key);
block = header.cloneWithColumns(std::move(result_columns));
}
void TTLAggregationAlgorithm::calculateAggregates(const MutableColumns & aggregate_columns, size_t start_pos, size_t length)
{
Columns aggregate_chunk;
aggregate_chunk.reserve(aggregate_columns.size());
for (const auto & name : header.getNames())
{
const auto & column = aggregate_columns[header.getPositionByName(name)];
ColumnPtr chunk_column = column->cut(start_pos, length);
aggregate_chunk.emplace_back(std::move(chunk_column));
}
aggregator->executeOnBlock(aggregate_chunk, length, aggregation_result, key_columns,
columns_for_aggregator, no_more_keys);
}
void TTLAggregationAlgorithm::finalizeAggregates(MutableColumns & result_columns)
{
if (!aggregation_result.empty())
{
auto aggregated_res = aggregator->convertToBlocks(aggregation_result, true, 1);
for (auto & agg_block : aggregated_res)
{
for (const auto & it : description.set_parts)
it.expression->execute(agg_block);
for (const auto & name : description.group_by_keys)
{
const IColumn * values_column = agg_block.getByName(name).column.get();
auto & result_column = result_columns[header.getPositionByName(name)];
result_column->insertRangeFrom(*values_column, 0, agg_block.rows());
}
for (const auto & it : description.set_parts)
{
const IColumn * values_column = agg_block.getByName(it.expression_result_column_name).column.get();
auto & result_column = result_columns[header.getPositionByName(it.column_name)];
result_column->insertRangeFrom(*values_column, 0, agg_block.rows());
}
}
}
aggregation_result.invalidate();
}
void TTLAggregationAlgorithm::finalize(const MutableDataPartPtr & data_part) const
{
data_part->ttl_infos.group_by_ttl[description.result_column] = new_ttl_info;
data_part->ttl_infos.updatePartMinMaxTTL(new_ttl_info.min, new_ttl_info.max);
}
}

View File

@ -0,0 +1,42 @@
#pragma once
#include <DataStreams/ITTLAlgorithm.h>
#include <Interpreters/Aggregator.h>
#include <Storages/MergeTree/MergeTreeData.h>
namespace DB
{
/// Aggregates rows according to 'TTL expr GROUP BY key' description.
/// Aggregation key must be the prefix of the sorting key.
class TTLAggregationAlgorithm final : public ITTLAlgorithm
{
public:
TTLAggregationAlgorithm(
const TTLDescription & description_,
const TTLInfo & old_ttl_info_,
time_t current_time_,
bool force_,
const Block & header_,
const MergeTreeData & storage_);
void execute(Block & block) override;
void finalize(const MutableDataPartPtr & data_part) const override;
private:
// Calculate aggregates of aggregate_columns into aggregation_result
void calculateAggregates(const MutableColumns & aggregate_columns, size_t start_pos, size_t length);
/// Finalize aggregation_result into result_columns
void finalizeAggregates(MutableColumns & result_columns);
const Block header;
std::unique_ptr<Aggregator> aggregator;
Row current_key_value;
AggregatedDataVariants aggregation_result;
ColumnRawPtrs key_columns;
Aggregator::AggregateColumns columns_for_aggregator;
bool no_more_keys = false;
};
}

View File

@ -8,15 +8,14 @@
#include <Storages/TTLMode.h> #include <Storages/TTLMode.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <DataStreams/TTLDeleteAlgorithm.h>
#include <DataStreams/TTLColumnAlgorithm.h>
#include <DataStreams/TTLAggregationAlgorithm.h>
#include <DataStreams/TTLUpdateInfoAlgorithm.h>
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
TTLBlockInputStream::TTLBlockInputStream( TTLBlockInputStream::TTLBlockInputStream(
const BlockInputStreamPtr & input_, const BlockInputStreamPtr & input_,
const MergeTreeData & storage_, const MergeTreeData & storage_,
@ -24,83 +23,69 @@ TTLBlockInputStream::TTLBlockInputStream(
const MergeTreeData::MutableDataPartPtr & data_part_, const MergeTreeData::MutableDataPartPtr & data_part_,
time_t current_time_, time_t current_time_,
bool force_) bool force_)
: storage(storage_) : data_part(data_part_)
, metadata_snapshot(metadata_snapshot_) , log(&Poco::Logger::get(storage_.getLogName() + " (TTLBlockInputStream)"))
, data_part(data_part_)
, current_time(current_time_)
, force(force_)
, old_ttl_infos(data_part->ttl_infos)
, log(&Poco::Logger::get(storage.getLogName() + " (TTLBlockInputStream)"))
, date_lut(DateLUT::instance())
{ {
children.push_back(input_); children.push_back(input_);
header = children.at(0)->getHeader(); header = children.at(0)->getHeader();
auto old_ttl_infos = data_part->ttl_infos;
const auto & storage_columns = metadata_snapshot->getColumns(); if (metadata_snapshot_->hasRowsTTL())
const auto & column_defaults = storage_columns.getDefaults();
ASTPtr default_expr_list = std::make_shared<ASTExpressionList>();
for (const auto & [name, _] : metadata_snapshot->getColumnTTLs())
{ {
auto it = column_defaults.find(name); const auto & rows_ttl = metadata_snapshot_->getRowsTTL();
if (it != column_defaults.end()) auto algorithm = std::make_unique<TTLDeleteAlgorithm>(
rows_ttl, old_ttl_infos.table_ttl, current_time_, force_);
/// Skip all data if table ttl is expired for part
if (algorithm->isMaxTTLExpired() && !rows_ttl.where_expression)
all_data_dropped = true;
delete_algorithm = algorithm.get();
algorithms.emplace_back(std::move(algorithm));
}
for (const auto & where_ttl : metadata_snapshot_->getRowsWhereTTLs())
algorithms.emplace_back(std::make_unique<TTLDeleteAlgorithm>(
where_ttl, old_ttl_infos.rows_where_ttl[where_ttl.result_column], current_time_, force_));
for (const auto & group_by_ttl : metadata_snapshot_->getGroupByTTLs())
algorithms.emplace_back(std::make_unique<TTLAggregationAlgorithm>(
group_by_ttl, old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_, header, storage_));
if (metadata_snapshot_->hasAnyColumnTTL())
{
const auto & storage_columns = metadata_snapshot_->getColumns();
const auto & column_defaults = storage_columns.getDefaults();
for (const auto & [name, description] : metadata_snapshot_->getColumnTTLs())
{ {
auto column = storage_columns.get(name); ExpressionActionsPtr default_expression;
auto expression = it->second.expression->clone(); String default_column_name;
default_expr_list->children.emplace_back(setAlias(addTypeConversionToAST(std::move(expression), column.type->getName()), it->first)); auto it = column_defaults.find(name);
if (it != column_defaults.end())
{
const auto & column = storage_columns.get(name);
auto default_ast = it->second.expression->clone();
default_ast = addTypeConversionToAST(std::move(default_ast), column.type->getName());
auto syntax_result = TreeRewriter(storage_.global_context).analyze(default_ast, metadata_snapshot_->getColumns().getAllPhysical());
default_expression = ExpressionAnalyzer{default_ast, syntax_result, storage_.global_context}.getActions(true);
default_column_name = default_ast->getColumnName();
}
algorithms.emplace_back(std::make_unique<TTLColumnAlgorithm>(
description, old_ttl_infos.columns_ttl[name], current_time_,
force_, name, default_expression, default_column_name));
} }
} }
for (const auto & [name, ttl_info] : old_ttl_infos.columns_ttl) for (const auto & move_ttl : metadata_snapshot_->getMoveTTLs())
{ algorithms.emplace_back(std::make_unique<TTLMoveAlgorithm>(
if (force || isTTLExpired(ttl_info.min)) move_ttl, old_ttl_infos.moves_ttl[move_ttl.result_column], current_time_, force_));
{
new_ttl_infos.columns_ttl.emplace(name, IMergeTreeDataPart::TTLInfo{});
empty_columns.emplace(name);
}
else
new_ttl_infos.columns_ttl.emplace(name, ttl_info);
}
if (!force && !isTTLExpired(old_ttl_infos.table_ttl.min)) for (const auto & recompression_ttl : metadata_snapshot_->getRecompressionTTLs())
new_ttl_infos.table_ttl = old_ttl_infos.table_ttl; algorithms.emplace_back(std::make_unique<TTLRecompressionAlgorithm>(
recompression_ttl, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_));
if (!default_expr_list->children.empty())
{
auto syntax_result = TreeRewriter(storage.global_context).analyze(default_expr_list, metadata_snapshot->getColumns().getAllPhysical());
defaults_expression = ExpressionAnalyzer{default_expr_list, syntax_result, storage.global_context}.getActions(true);
}
auto storage_rows_ttl = metadata_snapshot->getRowsTTL();
if (metadata_snapshot->hasRowsTTL() && storage_rows_ttl.mode == TTLMode::GROUP_BY)
{
current_key_value.resize(storage_rows_ttl.group_by_keys.size());
ColumnNumbers keys;
for (const auto & key : storage_rows_ttl.group_by_keys)
keys.push_back(header.getPositionByName(key));
agg_key_columns.resize(storage_rows_ttl.group_by_keys.size());
AggregateDescriptions aggregates = storage_rows_ttl.aggregate_descriptions;
for (auto & descr : aggregates)
if (descr.arguments.empty())
for (const auto & name : descr.argument_names)
descr.arguments.push_back(header.getPositionByName(name));
agg_aggregate_columns.resize(storage_rows_ttl.aggregate_descriptions.size());
const Settings & settings = storage.global_context.getSettingsRef();
Aggregator::Params params(header, keys, aggregates,
false, settings.max_rows_to_group_by, settings.group_by_overflow_mode, 0, 0,
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
storage.global_context.getTemporaryVolume(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
aggregator = std::make_unique<Aggregator>(params);
}
}
bool TTLBlockInputStream::isTTLExpired(time_t ttl) const
{
return (ttl && (ttl <= current_time));
} }
Block reorderColumns(Block block, const Block & header) Block reorderColumns(Block block, const Block & header)
@ -114,321 +99,30 @@ Block reorderColumns(Block block, const Block & header)
Block TTLBlockInputStream::readImpl() Block TTLBlockInputStream::readImpl()
{ {
/// Skip all data if table ttl is expired for part if (all_data_dropped)
auto storage_rows_ttl = metadata_snapshot->getRowsTTL();
if (metadata_snapshot->hasRowsTTL() && !storage_rows_ttl.where_expression && storage_rows_ttl.mode != TTLMode::GROUP_BY
&& isTTLExpired(old_ttl_infos.table_ttl.max))
{
rows_removed = data_part->rows_count;
return {}; return {};
}
auto block = children.at(0)->read();
for (const auto & algorithm : algorithms)
algorithm->execute(block);
Block block = children.at(0)->read();
if (!block) if (!block)
{
if (aggregator && !agg_result.empty())
{
MutableColumns result_columns = header.cloneEmptyColumns();
finalizeAggregates(result_columns);
block = header.cloneWithColumns(std::move(result_columns));
}
return block; return block;
}
if (metadata_snapshot->hasRowsTTL() && (force || isTTLExpired(old_ttl_infos.table_ttl.min)))
removeRowsWithExpiredTableTTL(block);
removeValuesWithExpiredColumnTTL(block);
updateMovesTTL(block);
updateRecompressionTTL(block);
return reorderColumns(std::move(block), header); return reorderColumns(std::move(block), header);
} }
void TTLBlockInputStream::readSuffixImpl() void TTLBlockInputStream::readSuffixImpl()
{ {
for (const auto & elem : new_ttl_infos.columns_ttl) data_part->ttl_infos = {};
new_ttl_infos.updatePartMinMaxTTL(elem.second.min, elem.second.max); for (const auto & algorithm : algorithms)
algorithm->finalize(data_part);
new_ttl_infos.updatePartMinMaxTTL(new_ttl_infos.table_ttl.min, new_ttl_infos.table_ttl.max); if (delete_algorithm)
{
data_part->ttl_infos = std::move(new_ttl_infos); size_t rows_removed = all_data_dropped ? data_part->rows_count : delete_algorithm->getNumberOfRemovedRows();
data_part->expired_columns = std::move(empty_columns);
if (rows_removed)
LOG_DEBUG(log, "Removed {} rows with expired TTL from part {}", rows_removed, data_part->name); LOG_DEBUG(log, "Removed {} rows with expired TTL from part {}", rows_removed, data_part->name);
}
void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block)
{
auto rows_ttl = metadata_snapshot->getRowsTTL();
rows_ttl.expression->execute(block);
if (rows_ttl.where_expression)
rows_ttl.where_expression->execute(block);
const IColumn * ttl_column =
block.getByName(rows_ttl.result_column).column.get();
const IColumn * where_result_column = rows_ttl.where_expression ?
block.getByName(rows_ttl.where_result_column).column.get() : nullptr;
const auto & column_names = header.getNames();
if (!aggregator)
{
MutableColumns result_columns;
result_columns.reserve(column_names.size());
for (auto it = column_names.begin(); it != column_names.end(); ++it)
{
const IColumn * values_column = block.getByName(*it).column.get();
MutableColumnPtr result_column = values_column->cloneEmpty();
result_column->reserve(block.rows());
for (size_t i = 0; i < block.rows(); ++i)
{
UInt32 cur_ttl = getTimestampByIndex(ttl_column, i);
bool where_filter_passed = !where_result_column || where_result_column->getBool(i);
if (!isTTLExpired(cur_ttl) || !where_filter_passed)
{
new_ttl_infos.table_ttl.update(cur_ttl);
result_column->insertFrom(*values_column, i);
}
else if (it == column_names.begin())
++rows_removed;
}
result_columns.emplace_back(std::move(result_column));
}
block = header.cloneWithColumns(std::move(result_columns));
}
else
{
MutableColumns result_columns = header.cloneEmptyColumns();
MutableColumns aggregate_columns = header.cloneEmptyColumns();
size_t rows_aggregated = 0;
size_t current_key_start = 0;
size_t rows_with_current_key = 0;
auto storage_rows_ttl = metadata_snapshot->getRowsTTL();
for (size_t i = 0; i < block.rows(); ++i)
{
UInt32 cur_ttl = getTimestampByIndex(ttl_column, i);
bool where_filter_passed = !where_result_column || where_result_column->getBool(i);
bool ttl_expired = isTTLExpired(cur_ttl) && where_filter_passed;
bool same_as_current = true;
for (size_t j = 0; j < storage_rows_ttl.group_by_keys.size(); ++j)
{
const String & key_column = storage_rows_ttl.group_by_keys[j];
const IColumn * values_column = block.getByName(key_column).column.get();
if (!same_as_current || (*values_column)[i] != current_key_value[j])
{
values_column->get(i, current_key_value[j]);
same_as_current = false;
}
}
if (!same_as_current)
{
if (rows_with_current_key)
calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key);
finalizeAggregates(result_columns);
current_key_start = rows_aggregated;
rows_with_current_key = 0;
}
if (ttl_expired)
{
++rows_with_current_key;
++rows_aggregated;
for (const auto & name : column_names)
{
const IColumn * values_column = block.getByName(name).column.get();
auto & column = aggregate_columns[header.getPositionByName(name)];
column->insertFrom(*values_column, i);
}
}
else
{
new_ttl_infos.table_ttl.update(cur_ttl);
for (const auto & name : column_names)
{
const IColumn * values_column = block.getByName(name).column.get();
auto & column = result_columns[header.getPositionByName(name)];
column->insertFrom(*values_column, i);
}
}
}
if (rows_with_current_key)
calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key);
block = header.cloneWithColumns(std::move(result_columns));
} }
} }
void TTLBlockInputStream::calculateAggregates(const MutableColumns & aggregate_columns, size_t start_pos, size_t length)
{
Columns aggregate_chunk;
aggregate_chunk.reserve(aggregate_columns.size());
for (const auto & name : header.getNames())
{
const auto & column = aggregate_columns[header.getPositionByName(name)];
ColumnPtr chunk_column = column->cut(start_pos, length);
aggregate_chunk.emplace_back(std::move(chunk_column));
}
aggregator->executeOnBlock(aggregate_chunk, length, agg_result, agg_key_columns,
agg_aggregate_columns, agg_no_more_keys);
}
void TTLBlockInputStream::finalizeAggregates(MutableColumns & result_columns)
{
if (!agg_result.empty())
{
auto aggregated_res = aggregator->convertToBlocks(agg_result, true, 1);
auto storage_rows_ttl = metadata_snapshot->getRowsTTL();
for (auto & agg_block : aggregated_res)
{
for (const auto & it : storage_rows_ttl.set_parts)
it.expression->execute(agg_block);
for (const auto & name : storage_rows_ttl.group_by_keys)
{
const IColumn * values_column = agg_block.getByName(name).column.get();
auto & result_column = result_columns[header.getPositionByName(name)];
result_column->insertRangeFrom(*values_column, 0, agg_block.rows());
}
for (const auto & it : storage_rows_ttl.set_parts)
{
const IColumn * values_column = agg_block.getByName(it.expression_result_column_name).column.get();
auto & result_column = result_columns[header.getPositionByName(it.column_name)];
result_column->insertRangeFrom(*values_column, 0, agg_block.rows());
}
}
}
agg_result.invalidate();
}
void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block)
{
Block block_with_defaults;
if (defaults_expression)
{
block_with_defaults = block;
defaults_expression->execute(block_with_defaults);
}
std::vector<String> columns_to_remove;
for (const auto & [name, ttl_entry] : metadata_snapshot->getColumnTTLs())
{
/// If we read not all table columns. E.g. while mutation.
if (!block.has(name))
continue;
const auto & old_ttl_info = old_ttl_infos.columns_ttl[name];
auto & new_ttl_info = new_ttl_infos.columns_ttl[name];
/// Nothing to do
if (!force && !isTTLExpired(old_ttl_info.min))
continue;
/// Later drop full column
if (isTTLExpired(old_ttl_info.max))
continue;
if (!block.has(ttl_entry.result_column))
{
columns_to_remove.push_back(ttl_entry.result_column);
ttl_entry.expression->execute(block);
}
ColumnPtr default_column = nullptr;
if (block_with_defaults.has(name))
default_column = block_with_defaults.getByName(name).column->convertToFullColumnIfConst();
auto & column_with_type = block.getByName(name);
const IColumn * values_column = column_with_type.column.get();
MutableColumnPtr result_column = values_column->cloneEmpty();
result_column->reserve(block.rows());
const IColumn * ttl_column = block.getByName(ttl_entry.result_column).column.get();
for (size_t i = 0; i < block.rows(); ++i)
{
UInt32 cur_ttl = getTimestampByIndex(ttl_column, i);
if (isTTLExpired(cur_ttl))
{
if (default_column)
result_column->insertFrom(*default_column, i);
else
result_column->insertDefault();
}
else
{
new_ttl_info.update(cur_ttl);
empty_columns.erase(name);
result_column->insertFrom(*values_column, i);
}
}
column_with_type.column = std::move(result_column);
}
for (const String & column : columns_to_remove)
block.erase(column);
}
void TTLBlockInputStream::updateTTLWithDescriptions(Block & block, const TTLDescriptions & descriptions, TTLInfoMap & ttl_info_map)
{
std::vector<String> columns_to_remove;
for (const auto & ttl_entry : descriptions)
{
auto & new_ttl_info = ttl_info_map[ttl_entry.result_column];
if (!block.has(ttl_entry.result_column))
{
columns_to_remove.push_back(ttl_entry.result_column);
ttl_entry.expression->execute(block);
}
const IColumn * ttl_column = block.getByName(ttl_entry.result_column).column.get();
for (size_t i = 0; i < block.rows(); ++i)
{
UInt32 cur_ttl = getTimestampByIndex(ttl_column, i);
new_ttl_info.update(cur_ttl);
}
}
for (const String & column : columns_to_remove)
block.erase(column);
}
void TTLBlockInputStream::updateMovesTTL(Block & block)
{
updateTTLWithDescriptions(block, metadata_snapshot->getMoveTTLs(), new_ttl_infos.moves_ttl);
}
void TTLBlockInputStream::updateRecompressionTTL(Block & block)
{
updateTTLWithDescriptions(block, metadata_snapshot->getRecompressionTTLs(), new_ttl_infos.recompression_ttl);
}
UInt32 TTLBlockInputStream::getTimestampByIndex(const IColumn * column, size_t ind)
{
if (const ColumnUInt16 * column_date = typeid_cast<const ColumnUInt16 *>(column))
return date_lut.fromDayNum(DayNum(column_date->getData()[ind]));
else if (const ColumnUInt32 * column_date_time = typeid_cast<const ColumnUInt32 *>(column))
return column_date_time->getData()[ind];
else if (const ColumnConst * column_const = typeid_cast<const ColumnConst *>(column))
{
if (typeid_cast<const ColumnUInt16 *>(&column_const->getDataColumn()))
return date_lut.fromDayNum(DayNum(column_const->getValue<UInt16>()));
else if (typeid_cast<const ColumnUInt32 *>(&column_const->getDataColumn()))
return column_const->getValue<UInt32>();
}
throw Exception("Unexpected type of result TTL column", ErrorCodes::LOGICAL_ERROR);
}
} }

View File

@ -3,8 +3,9 @@
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h> #include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Core/Block.h> #include <Core/Block.h>
#include <Interpreters/Aggregator.h>
#include <Storages/MergeTree/MergeTreeDataPartTTLInfo.h> #include <Storages/MergeTree/MergeTreeDataPartTTLInfo.h>
#include <DataStreams/ITTLAlgorithm.h>
#include <DataStreams/TTLDeleteAlgorithm.h>
#include <common/DateLUT.h> #include <common/DateLUT.h>
@ -24,7 +25,6 @@ public:
); );
String getName() const override { return "TTL"; } String getName() const override { return "TTL"; }
Block getHeader() const override { return header; } Block getHeader() const override { return header; }
protected: protected:
@ -34,60 +34,14 @@ protected:
void readSuffixImpl() override; void readSuffixImpl() override;
private: private:
const MergeTreeData & storage; std::vector<TTLAlgorithmPtr> algorithms;
StorageMetadataPtr metadata_snapshot; const TTLDeleteAlgorithm * delete_algorithm = nullptr;
bool all_data_dropped = false;
/// ttl_infos and empty_columns are updating while reading /// ttl_infos and empty_columns are updating while reading
const MergeTreeData::MutableDataPartPtr & data_part; const MergeTreeData::MutableDataPartPtr & data_part;
time_t current_time;
bool force;
std::unique_ptr<Aggregator> aggregator;
std::vector<Field> current_key_value;
AggregatedDataVariants agg_result;
ColumnRawPtrs agg_key_columns;
Aggregator::AggregateColumns agg_aggregate_columns;
bool agg_no_more_keys = false;
IMergeTreeDataPart::TTLInfos old_ttl_infos;
IMergeTreeDataPart::TTLInfos new_ttl_infos;
NameSet empty_columns;
size_t rows_removed = 0;
Poco::Logger * log; Poco::Logger * log;
const DateLUTImpl & date_lut;
/// TODO rewrite defaults logic to evaluteMissingDefaults
std::unordered_map<String, String> defaults_result_column;
ExpressionActionsPtr defaults_expression;
Block header; Block header;
private:
/// Removes values with expired ttl and computes new_ttl_infos and empty_columns for part
void removeValuesWithExpiredColumnTTL(Block & block);
/// Removes rows with expired table ttl and computes new ttl_infos for part
void removeRowsWithExpiredTableTTL(Block & block);
// Calculate aggregates of aggregate_columns into agg_result
void calculateAggregates(const MutableColumns & aggregate_columns, size_t start_pos, size_t length);
/// Finalize agg_result into result_columns
void finalizeAggregates(MutableColumns & result_columns);
/// Execute description expressions on block and update ttl's in
/// ttl_info_map with expression results.
void updateTTLWithDescriptions(Block & block, const TTLDescriptions & descriptions, TTLInfoMap & ttl_info_map);
/// Updates TTL for moves
void updateMovesTTL(Block & block);
/// Update values for recompression TTL using data from block.
void updateRecompressionTTL(Block & block);
UInt32 getTimestampByIndex(const IColumn * column, size_t ind);
bool isTTLExpired(time_t ttl) const;
}; };
} }

View File

@ -0,0 +1,83 @@
#include <DataStreams/TTLColumnAlgorithm.h>
namespace DB
{
TTLColumnAlgorithm::TTLColumnAlgorithm(
const TTLDescription & description_,
const TTLInfo & old_ttl_info_,
time_t current_time_,
bool force_,
const String & column_name_,
const ExpressionActionsPtr & default_expression_,
const String & default_column_name_)
: ITTLAlgorithm(description_, old_ttl_info_, current_time_, force_)
, column_name(column_name_)
, default_expression(default_expression_)
, default_column_name(default_column_name_)
{
if (!isMinTTLExpired())
{
new_ttl_info = old_ttl_info;
is_fully_empty = false;
}
}
void TTLColumnAlgorithm::execute(Block & block)
{
if (!block)
return;
/// If we read not all table columns. E.g. while mutation.
if (!block.has(column_name))
return;
/// Nothing to do
if (!isMinTTLExpired())
return;
/// Later drop full column
if (isMaxTTLExpired())
return;
auto default_column = executeExpressionAndGetColumn(default_expression, block, default_column_name);
if (default_column)
default_column = default_column->convertToFullColumnIfConst();
auto ttl_column = executeExpressionAndGetColumn(description.expression, block, description.result_column);
auto & column_with_type = block.getByName(column_name);
const IColumn * values_column = column_with_type.column.get();
MutableColumnPtr result_column = values_column->cloneEmpty();
result_column->reserve(block.rows());
for (size_t i = 0; i < block.rows(); ++i)
{
UInt32 cur_ttl = getTimestampByIndex(ttl_column.get(), i);
if (isTTLExpired(cur_ttl))
{
if (default_column)
result_column->insertFrom(*default_column, i);
else
result_column->insertDefault();
}
else
{
new_ttl_info.update(cur_ttl);
is_fully_empty = false;
result_column->insertFrom(*values_column, i);
}
}
column_with_type.column = std::move(result_column);
}
void TTLColumnAlgorithm::finalize(const MutableDataPartPtr & data_part) const
{
data_part->ttl_infos.columns_ttl[column_name] = new_ttl_info;
data_part->ttl_infos.updatePartMinMaxTTL(new_ttl_info.min, new_ttl_info.max);
if (is_fully_empty)
data_part->expired_columns.insert(column_name);
}
}

View File

@ -0,0 +1,33 @@
#pragma once
#include <DataStreams/ITTLAlgorithm.h>
namespace DB
{
/// Deletes (replaces to default) values in column according to column's TTL description.
/// If all values in column are replaced with defaults, this column won't be written to part.
class TTLColumnAlgorithm final : public ITTLAlgorithm
{
public:
TTLColumnAlgorithm(
const TTLDescription & description_,
const TTLInfo & old_ttl_info_,
time_t current_time_,
bool force_,
const String & column_name_,
const ExpressionActionsPtr & default_expression_,
const String & default_column_name_);
void execute(Block & block) override;
void finalize(const MutableDataPartPtr & data_part) const override;
private:
const String column_name;
const ExpressionActionsPtr default_expression;
const String default_column_name;
bool is_fully_empty = true;
};
}

View File

@ -0,0 +1,62 @@
#include <DataStreams/TTLDeleteAlgorithm.h>
namespace DB
{
TTLDeleteAlgorithm::TTLDeleteAlgorithm(
const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_)
: ITTLAlgorithm(description_, old_ttl_info_, current_time_, force_)
{
if (!isMinTTLExpired())
new_ttl_info = old_ttl_info;
}
void TTLDeleteAlgorithm::execute(Block & block)
{
if (!block || !isMinTTLExpired())
return;
auto ttl_column = executeExpressionAndGetColumn(description.expression, block, description.result_column);
auto where_column = executeExpressionAndGetColumn(description.where_expression, block, description.where_result_column);
MutableColumns result_columns;
const auto & column_names = block.getNames();
result_columns.reserve(column_names.size());
for (auto it = column_names.begin(); it != column_names.end(); ++it)
{
const IColumn * values_column = block.getByName(*it).column.get();
MutableColumnPtr result_column = values_column->cloneEmpty();
result_column->reserve(block.rows());
for (size_t i = 0; i < block.rows(); ++i)
{
UInt32 cur_ttl = getTimestampByIndex(ttl_column.get(), i);
bool where_filter_passed = !where_column || where_column->getBool(i);
if (!isTTLExpired(cur_ttl) || !where_filter_passed)
{
new_ttl_info.update(cur_ttl);
result_column->insertFrom(*values_column, i);
}
else if (it == column_names.begin())
++rows_removed;
}
result_columns.emplace_back(std::move(result_column));
}
block = block.cloneWithColumns(std::move(result_columns));
}
void TTLDeleteAlgorithm::finalize(const MutableDataPartPtr & data_part) const
{
if (description.where_expression)
data_part->ttl_infos.rows_where_ttl[description.result_column] = new_ttl_info;
else
data_part->ttl_infos.table_ttl = new_ttl_info;
data_part->ttl_infos.updatePartMinMaxTTL(new_ttl_info.min, new_ttl_info.max);
}
}

View File

@ -0,0 +1,23 @@
#pragma once
#include <DataStreams/ITTLAlgorithm.h>
namespace DB
{
/// Deletes rows according to table TTL description with
/// possible optional condition in 'WHERE' clause.
class TTLDeleteAlgorithm final : public ITTLAlgorithm
{
public:
TTLDeleteAlgorithm(const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_);
void execute(Block & block) override;
void finalize(const MutableDataPartPtr & data_part) const override;
size_t getNumberOfRemovedRows() const { return rows_removed; }
private:
size_t rows_removed = 0;
};
}

View File

@ -0,0 +1,47 @@
#include <DataStreams/TTLUpdateInfoAlgorithm.h>
namespace DB
{
TTLUpdateInfoAlgorithm::TTLUpdateInfoAlgorithm(
const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_)
: ITTLAlgorithm(description_, old_ttl_info_, current_time_, force_)
{
}
void TTLUpdateInfoAlgorithm::execute(Block & block)
{
if (!block)
return;
auto ttl_column = executeExpressionAndGetColumn(description.expression, block, description.result_column);
for (size_t i = 0; i < block.rows(); ++i)
{
UInt32 cur_ttl = ITTLAlgorithm::getTimestampByIndex(ttl_column.get(), i);
new_ttl_info.update(cur_ttl);
}
}
TTLMoveAlgorithm::TTLMoveAlgorithm(
const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_)
: TTLUpdateInfoAlgorithm(description_, old_ttl_info_, current_time_, force_)
{
}
void TTLMoveAlgorithm::finalize(const MutableDataPartPtr & data_part) const
{
data_part->ttl_infos.moves_ttl[description.result_column] = new_ttl_info;
}
TTLRecompressionAlgorithm::TTLRecompressionAlgorithm(
const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_)
: TTLUpdateInfoAlgorithm(description_, old_ttl_info_, current_time_, force_)
{
}
void TTLRecompressionAlgorithm::finalize(const MutableDataPartPtr & data_part) const
{
data_part->ttl_infos.recompression_ttl[description.result_column] = new_ttl_info;
}
}

View File

@ -0,0 +1,32 @@
#pragma once
#include <DataStreams/ITTLAlgorithm.h>
namespace DB
{
/// Calculates new ttl_info and does nothing with data.
class TTLUpdateInfoAlgorithm : public ITTLAlgorithm
{
public:
TTLUpdateInfoAlgorithm(const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_);
void execute(Block & block) override;
void finalize(const MutableDataPartPtr & data_part) const override = 0;
};
class TTLMoveAlgorithm final : public TTLUpdateInfoAlgorithm
{
public:
TTLMoveAlgorithm(const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_);
void finalize(const MutableDataPartPtr & data_part) const override;
};
class TTLRecompressionAlgorithm final : public TTLUpdateInfoAlgorithm
{
public:
TTLRecompressionAlgorithm(const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_);
void finalize(const MutableDataPartPtr & data_part) const override;
};
}

View File

@ -27,6 +27,7 @@ SRCS(
ExecutionSpeedLimits.cpp ExecutionSpeedLimits.cpp
ExpressionBlockInputStream.cpp ExpressionBlockInputStream.cpp
IBlockInputStream.cpp IBlockInputStream.cpp
ITTLAlgorithm.cpp
InputStreamFromASTInsertQuery.cpp InputStreamFromASTInsertQuery.cpp
InternalTextLogsRowOutputStream.cpp InternalTextLogsRowOutputStream.cpp
LimitBlockInputStream.cpp LimitBlockInputStream.cpp
@ -44,7 +45,11 @@ SRCS(
SquashingBlockInputStream.cpp SquashingBlockInputStream.cpp
SquashingBlockOutputStream.cpp SquashingBlockOutputStream.cpp
SquashingTransform.cpp SquashingTransform.cpp
TTLAggregationAlgorithm.cpp
TTLBlockInputStream.cpp TTLBlockInputStream.cpp
TTLColumnAlgorithm.cpp
TTLDeleteAlgorithm.cpp
TTLUpdateInfoAlgorithm.cpp
copyData.cpp copyData.cpp
finalizeBlock.cpp finalizeBlock.cpp
materializeBlock.cpp materializeBlock.cpp

View File

@ -20,7 +20,7 @@ ASTPtr ASTTTLElement::clone() const
for (auto & expr : clone->group_by_key) for (auto & expr : clone->group_by_key)
expr = expr->clone(); expr = expr->clone();
for (auto & [name, expr] : clone->group_by_aggregations) for (auto & expr : clone->group_by_assignments)
expr = expr->clone(); expr = expr->clone();
return clone; return clone;
@ -46,15 +46,15 @@ void ASTTTLElement::formatImpl(const FormatSettings & settings, FormatState & st
settings.ostr << ", "; settings.ostr << ", ";
(*it)->formatImpl(settings, state, frame); (*it)->formatImpl(settings, state, frame);
} }
if (!group_by_aggregations.empty())
if (!group_by_assignments.empty())
{ {
settings.ostr << " SET "; settings.ostr << " SET ";
for (auto it = group_by_aggregations.begin(); it != group_by_aggregations.end(); ++it) for (auto it = group_by_assignments.begin(); it != group_by_assignments.end(); ++it)
{ {
if (it != group_by_aggregations.begin()) if (it != group_by_assignments.begin())
settings.ostr << ", "; settings.ostr << ", ";
settings.ostr << it->first << " = "; (*it)->formatImpl(settings, state, frame);
it->second->formatImpl(settings, state, frame);
} }
} }
} }

View File

@ -18,7 +18,7 @@ public:
String destination_name; String destination_name;
ASTs group_by_key; ASTs group_by_key;
std::vector<std::pair<String, ASTPtr>> group_by_aggregations; ASTs group_by_assignments;
ASTPtr recompression_codec; ASTPtr recompression_codec;

View File

@ -24,6 +24,7 @@
#include <Parsers/ASTTTLElement.h> #include <Parsers/ASTTTLElement.h>
#include <Parsers/ASTWindowDefinition.h> #include <Parsers/ASTWindowDefinition.h>
#include <Parsers/IAST.h> #include <Parsers/IAST.h>
#include <Parsers/ASTAssignment.h>
#include <Parsers/parseIdentifierOrStringLiteral.h> #include <Parsers/parseIdentifierOrStringLiteral.h>
#include <Parsers/parseIntervalKind.h> #include <Parsers/parseIntervalKind.h>
@ -2008,9 +2009,12 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserIdentifier parser_identifier; ParserIdentifier parser_identifier;
ParserStringLiteral parser_string_literal; ParserStringLiteral parser_string_literal;
ParserExpression parser_exp; ParserExpression parser_exp;
ParserExpressionList parser_expression_list(false); ParserExpressionList parser_keys_list(false);
ParserCodec parser_codec; ParserCodec parser_codec;
ParserList parser_assignment_list(
std::make_unique<ParserAssignment>(), std::make_unique<ParserToken>(TokenType::Comma));
ASTPtr ttl_expr; ASTPtr ttl_expr;
if (!parser_exp.parse(pos, ttl_expr, expected)) if (!parser_exp.parse(pos, ttl_expr, expected))
return false; return false;
@ -2044,9 +2048,9 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
} }
ASTPtr where_expr; ASTPtr where_expr;
ASTPtr ast_group_by_key; ASTPtr group_by_key;
ASTPtr recompression_codec; ASTPtr recompression_codec;
std::vector<std::pair<String, ASTPtr>> group_by_aggregations; ASTPtr group_by_assignments;
if (mode == TTLMode::MOVE) if (mode == TTLMode::MOVE)
{ {
@ -2058,30 +2062,13 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
} }
else if (mode == TTLMode::GROUP_BY) else if (mode == TTLMode::GROUP_BY)
{ {
if (!parser_expression_list.parse(pos, ast_group_by_key, expected)) if (!parser_keys_list.parse(pos, group_by_key, expected))
return false; return false;
if (s_set.ignore(pos)) if (s_set.ignore(pos))
{ {
while (true) if (!parser_assignment_list.parse(pos, group_by_assignments, expected))
{ return false;
if (!group_by_aggregations.empty() && !s_comma.ignore(pos))
break;
ASTPtr name;
ASTPtr value;
if (!parser_identifier.parse(pos, name, expected))
return false;
if (!s_eq.ignore(pos))
return false;
if (!parser_exp.parse(pos, value, expected))
return false;
String name_str;
if (!tryGetIdentifierNameInto(name, name_str))
return false;
group_by_aggregations.emplace_back(name_str, std::move(value));
}
} }
} }
else if (mode == TTLMode::DELETE && s_where.ignore(pos)) else if (mode == TTLMode::DELETE && s_where.ignore(pos))
@ -2105,8 +2092,9 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (mode == TTLMode::GROUP_BY) if (mode == TTLMode::GROUP_BY)
{ {
ttl_element->group_by_key = std::move(ast_group_by_key->children); ttl_element->group_by_key = std::move(group_by_key->children);
ttl_element->group_by_aggregations = std::move(group_by_aggregations); if (group_by_assignments)
ttl_element->group_by_assignments = std::move(group_by_assignments->children);
} }
if (mode == TTLMode::RECOMPRESS) if (mode == TTLMode::RECOMPRESS)
@ -2141,4 +2129,31 @@ bool ParserIdentifierWithOptionalParameters::parseImpl(Pos & pos, ASTPtr & node,
return false; return false;
} }
bool ParserAssignment::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
auto assignment = std::make_shared<ASTAssignment>();
node = assignment;
ParserIdentifier p_identifier;
ParserToken s_equals(TokenType::Equals);
ParserExpression p_expression;
ASTPtr column;
if (!p_identifier.parse(pos, column, expected))
return false;
if (!s_equals.ignore(pos, expected))
return false;
ASTPtr expression;
if (!p_expression.parse(pos, expression, expected))
return false;
tryGetIdentifierNameInto(column, assignment->column_name);
if (expression)
assignment->children.push_back(expression);
return true;
}
} }

View File

@ -483,4 +483,12 @@ protected:
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
}; };
/// Part of the UPDATE command or TTL with GROUP BY of the form: col_name = expr
class ParserAssignment : public IParserBase
{
protected:
const char * getName() const override{ return "column assignment"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
} }

View File

@ -11,7 +11,6 @@
#include <Parsers/ASTIndexDeclaration.h> #include <Parsers/ASTIndexDeclaration.h>
#include <Parsers/ASTAlterQuery.h> #include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTLiteral.h> #include <Parsers/ASTLiteral.h>
#include <Parsers/ASTAssignment.h>
#include <Parsers/parseDatabaseAndTableName.h> #include <Parsers/parseDatabaseAndTableName.h>
@ -651,34 +650,6 @@ bool ParserAlterCommandList::parseImpl(Pos & pos, ASTPtr & node, Expected & expe
} }
bool ParserAssignment::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
auto assignment = std::make_shared<ASTAssignment>();
node = assignment;
ParserIdentifier p_identifier;
ParserToken s_equals(TokenType::Equals);
ParserExpression p_expression;
ASTPtr column;
if (!p_identifier.parse(pos, column, expected))
return false;
if (!s_equals.ignore(pos, expected))
return false;
ASTPtr expression;
if (!p_expression.parse(pos, expression, expected))
return false;
tryGetIdentifierNameInto(column, assignment->column_name);
if (expression)
assignment->children.push_back(expression);
return true;
}
bool ParserAlterQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) bool ParserAlterQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{ {
auto query = std::make_shared<ASTAlterQuery>(); auto query = std::make_shared<ASTAlterQuery>();

View File

@ -63,12 +63,4 @@ public:
}; };
/// Part of the UPDATE command of the form: col_name = expr
class ParserAssignment : public IParserBase
{
protected:
const char * getName() const override{ return "column assignment"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
} }

View File

@ -1278,6 +1278,18 @@ bool IMergeTreeDataPart::checkAllTTLCalculated(const StorageMetadataPtr & metada
return false; return false;
} }
for (const auto & group_by_desc : metadata_snapshot->getGroupByTTLs())
{
if (!ttl_infos.group_by_ttl.count(group_by_desc.result_column))
return false;
}
for (const auto & rows_where_desc : metadata_snapshot->getRowsWhereTTLs())
{
if (!ttl_infos.rows_where_ttl.count(rows_where_desc.result_column))
return false;
}
return true; return true;
} }

View File

@ -17,13 +17,23 @@ void MergeTreeDataPartTTLInfos::update(const MergeTreeDataPartTTLInfos & other_i
updatePartMinMaxTTL(ttl_info.min, ttl_info.max); updatePartMinMaxTTL(ttl_info.min, ttl_info.max);
} }
for (const auto & [name, ttl_info] : other_infos.rows_where_ttl)
{
rows_where_ttl[name].update(ttl_info);
updatePartMinMaxTTL(ttl_info.min, ttl_info.max);
}
for (const auto & [name, ttl_info] : other_infos.group_by_ttl)
{
group_by_ttl[name].update(ttl_info);
updatePartMinMaxTTL(ttl_info.min, ttl_info.max);
}
for (const auto & [name, ttl_info] : other_infos.recompression_ttl) for (const auto & [name, ttl_info] : other_infos.recompression_ttl)
recompression_ttl[name].update(ttl_info); recompression_ttl[name].update(ttl_info);
for (const auto & [expression, ttl_info] : other_infos.moves_ttl) for (const auto & [expression, ttl_info] : other_infos.moves_ttl)
{
moves_ttl[expression].update(ttl_info); moves_ttl[expression].update(ttl_info);
}
table_ttl.update(other_infos.table_ttl); table_ttl.update(other_infos.table_ttl);
updatePartMinMaxTTL(table_ttl.min, table_ttl.max); updatePartMinMaxTTL(table_ttl.min, table_ttl.max);
@ -59,29 +69,41 @@ void MergeTreeDataPartTTLInfos::read(ReadBuffer & in)
updatePartMinMaxTTL(table_ttl.min, table_ttl.max); updatePartMinMaxTTL(table_ttl.min, table_ttl.max);
} }
auto fill_ttl_info_map = [this](const JSON & json_part, TTLInfoMap & ttl_info_map, bool update_min_max)
{
for (auto elem : json_part) // NOLINT
{
MergeTreeDataPartTTLInfo ttl_info;
ttl_info.min = elem["min"].getUInt();
ttl_info.max = elem["max"].getUInt();
String expression = elem["expression"].getString();
ttl_info_map.emplace(expression, ttl_info);
if (update_min_max)
updatePartMinMaxTTL(ttl_info.min, ttl_info.max);
}
};
if (json.has("moves")) if (json.has("moves"))
{ {
const JSON & moves = json["moves"]; const JSON & moves = json["moves"];
for (auto move : moves) // NOLINT fill_ttl_info_map(moves, moves_ttl, false);
{
MergeTreeDataPartTTLInfo ttl_info;
ttl_info.min = move["min"].getUInt();
ttl_info.max = move["max"].getUInt();
String expression = move["expression"].getString();
moves_ttl.emplace(expression, ttl_info);
}
} }
if (json.has("recompression")) if (json.has("recompression"))
{ {
const JSON & recompressions = json["recompression"]; const JSON & recompressions = json["recompression"];
for (auto recompression : recompressions) // NOLINT fill_ttl_info_map(recompressions, recompression_ttl, false);
{ }
MergeTreeDataPartTTLInfo ttl_info; if (json.has("group_by"))
ttl_info.min = recompression["min"].getUInt(); {
ttl_info.max = recompression["max"].getUInt(); const JSON & group_by = json["group_by"];
String expression = recompression["expression"].getString(); fill_ttl_info_map(group_by, group_by_ttl, true);
recompression_ttl.emplace(expression, ttl_info); }
} if (json.has("rows_where"))
{
const JSON & rows_where = json["rows_where"];
fill_ttl_info_map(rows_where, rows_where_ttl, true);
} }
} }
@ -118,47 +140,52 @@ void MergeTreeDataPartTTLInfos::write(WriteBuffer & out) const
writeIntText(table_ttl.max, out); writeIntText(table_ttl.max, out);
writeString("}", out); writeString("}", out);
} }
auto write_infos = [&out](const TTLInfoMap & infos, const String & type, bool is_first)
{
if (!is_first)
writeString(",", out);
writeDoubleQuotedString(type, out);
writeString(":[", out);
for (auto it = infos.begin(); it != infos.end(); ++it)
{
if (it != infos.begin())
writeString(",", out);
writeString(R"({"expression":)", out);
writeString(doubleQuoteString(it->first), out);
writeString(R"(,"min":)", out);
writeIntText(it->second.min, out);
writeString(R"(,"max":)", out);
writeIntText(it->second.max, out);
writeString("}", out);
}
writeString("]", out);
};
bool is_first = columns_ttl.empty() && !table_ttl.min;
if (!moves_ttl.empty()) if (!moves_ttl.empty())
{ {
if (!columns_ttl.empty() || table_ttl.min) write_infos(moves_ttl, "moves", is_first);
writeString(",", out); is_first = false;
writeString(R"("moves":[)", out);
for (auto it = moves_ttl.begin(); it != moves_ttl.end(); ++it)
{
if (it != moves_ttl.begin())
writeString(",", out);
writeString(R"({"expression":)", out);
writeString(doubleQuoteString(it->first), out);
writeString(R"(,"min":)", out);
writeIntText(it->second.min, out);
writeString(R"(,"max":)", out);
writeIntText(it->second.max, out);
writeString("}", out);
}
writeString("]", out);
} }
if (!recompression_ttl.empty()) if (!recompression_ttl.empty())
{ {
if (!moves_ttl.empty() || !columns_ttl.empty() || table_ttl.min) write_infos(recompression_ttl, "recompression", is_first);
writeString(",", out); is_first = false;
writeString(R"("recompression":[)", out);
for (auto it = recompression_ttl.begin(); it != recompression_ttl.end(); ++it)
{
if (it != recompression_ttl.begin())
writeString(",", out);
writeString(R"({"expression":)", out);
writeString(doubleQuoteString(it->first), out);
writeString(R"(,"min":)", out);
writeIntText(it->second.min, out);
writeString(R"(,"max":)", out);
writeIntText(it->second.max, out);
writeString("}", out);
}
writeString("]", out);
} }
if (!group_by_ttl.empty())
{
write_infos(group_by_ttl, "group_by", is_first);
is_first = false;
}
if (!rows_where_ttl.empty())
write_infos(rows_where_ttl, "rows_where", is_first);
writeString("}", out); writeString("}", out);
} }

View File

@ -45,14 +45,17 @@ struct MergeTreeDataPartTTLInfos
time_t part_min_ttl = 0; time_t part_min_ttl = 0;
time_t part_max_ttl = 0; time_t part_max_ttl = 0;
TTLInfoMap rows_where_ttl;
TTLInfoMap moves_ttl; TTLInfoMap moves_ttl;
TTLInfoMap recompression_ttl; TTLInfoMap recompression_ttl;
TTLInfoMap group_by_ttl;
/// Return the smallest max recompression TTL value /// Return the smallest max recompression TTL value
time_t getMinimalMaxRecompressionTTL() const; time_t getMinimalMaxRecompressionTTL() const;
void read(ReadBuffer & in); void read(ReadBuffer & in);
void write(WriteBuffer & out) const; void write(WriteBuffer & out) const;
void update(const MergeTreeDataPartTTLInfos & other_infos); void update(const MergeTreeDataPartTTLInfos & other_infos);
@ -68,6 +71,7 @@ struct MergeTreeDataPartTTLInfos
bool empty() const bool empty() const
{ {
/// part_min_ttl in minimum of rows, rows_where and group_by TTLs
return !part_min_ttl && moves_ttl.empty() && recompression_ttl.empty(); return !part_min_ttl && moves_ttl.empty() && recompression_ttl.empty();
} }
}; };

View File

@ -12,6 +12,7 @@
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <Poco/File.h> #include <Poco/File.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <DataStreams/ITTLAlgorithm.h>
#include <Parsers/queryToString.h> #include <Parsers/queryToString.h>
@ -91,31 +92,23 @@ void updateTTL(
const TTLDescription & ttl_entry, const TTLDescription & ttl_entry,
IMergeTreeDataPart::TTLInfos & ttl_infos, IMergeTreeDataPart::TTLInfos & ttl_infos,
DB::MergeTreeDataPartTTLInfo & ttl_info, DB::MergeTreeDataPartTTLInfo & ttl_info,
Block & block, const Block & block,
bool update_part_min_max_ttls) bool update_part_min_max_ttls)
{ {
bool remove_column = false; auto ttl_column = ITTLAlgorithm::executeExpressionAndGetColumn(ttl_entry.expression, block, ttl_entry.result_column);
if (!block.has(ttl_entry.result_column))
{
ttl_entry.expression->execute(block);
remove_column = true;
}
const auto & current = block.getByName(ttl_entry.result_column); if (const ColumnUInt16 * column_date = typeid_cast<const ColumnUInt16 *>(ttl_column.get()))
const IColumn * column = current.column.get();
if (const ColumnUInt16 * column_date = typeid_cast<const ColumnUInt16 *>(column))
{ {
const auto & date_lut = DateLUT::instance(); const auto & date_lut = DateLUT::instance();
for (const auto & val : column_date->getData()) for (const auto & val : column_date->getData())
ttl_info.update(date_lut.fromDayNum(DayNum(val))); ttl_info.update(date_lut.fromDayNum(DayNum(val)));
} }
else if (const ColumnUInt32 * column_date_time = typeid_cast<const ColumnUInt32 *>(column)) else if (const ColumnUInt32 * column_date_time = typeid_cast<const ColumnUInt32 *>(ttl_column.get()))
{ {
for (const auto & val : column_date_time->getData()) for (const auto & val : column_date_time->getData())
ttl_info.update(val); ttl_info.update(val);
} }
else if (const ColumnConst * column_const = typeid_cast<const ColumnConst *>(column)) else if (const ColumnConst * column_const = typeid_cast<const ColumnConst *>(ttl_column.get()))
{ {
if (typeid_cast<const ColumnUInt16 *>(&column_const->getDataColumn())) if (typeid_cast<const ColumnUInt16 *>(&column_const->getDataColumn()))
{ {
@ -134,9 +127,6 @@ void updateTTL(
if (update_part_min_max_ttls) if (update_part_min_max_ttls)
ttl_infos.updatePartMinMaxTTL(ttl_info.min, ttl_info.max); ttl_infos.updatePartMinMaxTTL(ttl_info.min, ttl_info.max);
if (remove_column)
block.erase(ttl_entry.result_column);
} }
} }
@ -383,6 +373,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
if (metadata_snapshot->hasRowsTTL()) if (metadata_snapshot->hasRowsTTL())
updateTTL(metadata_snapshot->getRowsTTL(), new_data_part->ttl_infos, new_data_part->ttl_infos.table_ttl, block, true); updateTTL(metadata_snapshot->getRowsTTL(), new_data_part->ttl_infos, new_data_part->ttl_infos.table_ttl, block, true);
for (const auto & ttl_entry : metadata_snapshot->getGroupByTTLs())
updateTTL(ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.group_by_ttl[ttl_entry.result_column], block, true);
for (const auto & ttl_entry : metadata_snapshot->getRowsWhereTTLs())
updateTTL(ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.rows_where_ttl[ttl_entry.result_column], block, true);
for (const auto & [name, ttl_entry] : metadata_snapshot->getColumnTTLs()) for (const auto & [name, ttl_entry] : metadata_snapshot->getColumnTTLs())
updateTTL(ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.columns_ttl[name], block, true); updateTTL(ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.columns_ttl[name], block, true);

View File

@ -128,7 +128,7 @@ TTLTableDescription StorageInMemoryMetadata::getTableTTLs() const
bool StorageInMemoryMetadata::hasAnyTableTTL() const bool StorageInMemoryMetadata::hasAnyTableTTL() const
{ {
return hasAnyMoveTTL() || hasRowsTTL() || hasAnyRecompressionTTL(); return hasAnyMoveTTL() || hasRowsTTL() || hasAnyRecompressionTTL() || hasAnyGroupByTTL() || hasAnyRowsWhereTTL();
} }
TTLColumnsDescription StorageInMemoryMetadata::getColumnTTLs() const TTLColumnsDescription StorageInMemoryMetadata::getColumnTTLs() const
@ -151,6 +151,16 @@ bool StorageInMemoryMetadata::hasRowsTTL() const
return table_ttl.rows_ttl.expression != nullptr; return table_ttl.rows_ttl.expression != nullptr;
} }
TTLDescriptions StorageInMemoryMetadata::getRowsWhereTTLs() const
{
return table_ttl.rows_where_ttl;
}
bool StorageInMemoryMetadata::hasAnyRowsWhereTTL() const
{
return !table_ttl.rows_where_ttl.empty();
}
TTLDescriptions StorageInMemoryMetadata::getMoveTTLs() const TTLDescriptions StorageInMemoryMetadata::getMoveTTLs() const
{ {
return table_ttl.move_ttl; return table_ttl.move_ttl;
@ -171,6 +181,16 @@ bool StorageInMemoryMetadata::hasAnyRecompressionTTL() const
return !table_ttl.recompression_ttl.empty(); return !table_ttl.recompression_ttl.empty();
} }
TTLDescriptions StorageInMemoryMetadata::getGroupByTTLs() const
{
return table_ttl.group_by_ttl;
}
bool StorageInMemoryMetadata::hasAnyGroupByTTL() const
{
return !table_ttl.group_by_ttl.empty();
}
ColumnDependencies StorageInMemoryMetadata::getColumnDependencies(const NameSet & updated_columns) const ColumnDependencies StorageInMemoryMetadata::getColumnDependencies(const NameSet & updated_columns) const
{ {
if (updated_columns.empty()) if (updated_columns.empty())

View File

@ -109,6 +109,9 @@ struct StorageInMemoryMetadata
TTLDescription getRowsTTL() const; TTLDescription getRowsTTL() const;
bool hasRowsTTL() const; bool hasRowsTTL() const;
TTLDescriptions getRowsWhereTTLs() const;
bool hasAnyRowsWhereTTL() const;
/// Just wrapper for table TTLs, return moves (to disks or volumes) parts of /// Just wrapper for table TTLs, return moves (to disks or volumes) parts of
/// table TTL. /// table TTL.
TTLDescriptions getMoveTTLs() const; TTLDescriptions getMoveTTLs() const;
@ -118,6 +121,10 @@ struct StorageInMemoryMetadata
TTLDescriptions getRecompressionTTLs() const; TTLDescriptions getRecompressionTTLs() const;
bool hasAnyRecompressionTTL() const; bool hasAnyRecompressionTTL() const;
// Just wrapper for table TTLs, return info about recompression ttl
TTLDescriptions getGroupByTTLs() const;
bool hasAnyGroupByTTL() const;
/// Returns columns, which will be needed to calculate dependencies (skip /// Returns columns, which will be needed to calculate dependencies (skip
/// indices, TTL expressions) if we update @updated_columns set of columns. /// indices, TTL expressions) if we update @updated_columns set of columns.
ColumnDependencies getColumnDependencies(const NameSet & updated_columns) const; ColumnDependencies getColumnDependencies(const NameSet & updated_columns) const;

View File

@ -68,6 +68,14 @@ StorageSystemParts::StorageSystemParts(const StorageID & table_id_)
{"recompression_ttl_info.expression", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())}, {"recompression_ttl_info.expression", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"recompression_ttl_info.min", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())}, {"recompression_ttl_info.min", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())},
{"recompression_ttl_info.max", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())}, {"recompression_ttl_info.max", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())},
{"group_by_ttl_info.expression", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"group_by_ttl_info.min", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())},
{"group_by_ttl_info.max", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())},
{"rows_where_ttl_info.expression", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"rows_where_ttl_info.min", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())},
{"rows_where_ttl_info.max", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())}
} }
) )
{ {
@ -181,6 +189,8 @@ void StorageSystemParts::processNextStorage(MutableColumns & columns_, const Sto
columns_[i++]->insert(queryToString(part->default_codec->getCodecDesc())); columns_[i++]->insert(queryToString(part->default_codec->getCodecDesc()));
add_ttl_info_map(part->ttl_infos.recompression_ttl); add_ttl_info_map(part->ttl_infos.recompression_ttl);
add_ttl_info_map(part->ttl_infos.group_by_ttl);
add_ttl_info_map(part->ttl_infos.rows_where_ttl);
/// _state column should be the latest. /// _state column should be the latest.
if (has_state_column) if (has_state_column)

View File

@ -1,15 +1,21 @@
#include <Storages/TTLDescription.h> #include <Storages/TTLDescription.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Functions/IFunction.h> #include <Functions/IFunction.h>
#include <Interpreters/ExpressionAnalyzer.h> #include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h> #include <Interpreters/TreeRewriter.h>
#include <Interpreters/InDepthNodeVisitor.h>
#include <Interpreters/addTypeConversionToAST.h>
#include <Parsers/ASTExpressionList.h> #include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Parsers/ASTTTLElement.h> #include <Parsers/ASTTTLElement.h>
#include <Parsers/ASTIdentifier.h> #include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTAssignment.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/ColumnsDescription.h> #include <Storages/ColumnsDescription.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Parsers/queryToString.h>
#include <DataTypes/DataTypeDate.h> #include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h> #include <DataTypes/DataTypeDateTime.h>
@ -77,6 +83,24 @@ void checkTTLExpression(const ExpressionActionsPtr & ttl_expression, const Strin
} }
} }
class FindAggregateFunctionData
{
public:
using TypeToVisit = ASTFunction;
bool has_aggregate_function = false;
void visit(const ASTFunction & func, ASTPtr &)
{
/// Do not throw if found aggregate function inside another aggregate function,
/// because it will be checked, while creating expressions.
if (AggregateFunctionFactory::instance().isAggregateFunctionName(func.name))
has_aggregate_function = true;
}
};
using FindAggregateFunctionFinderMatcher = OneTypeMatcher<FindAggregateFunctionData>;
using FindAggregateFunctionVisitor = InDepthNodeVisitor<FindAggregateFunctionFinderMatcher, true>;
} }
TTLDescription::TTLDescription(const TTLDescription & other) TTLDescription::TTLDescription(const TTLDescription & other)
@ -182,11 +206,8 @@ TTLDescription TTLDescription::getTTLFromAST(
if (ttl_element->group_by_key.size() > pk_columns.size()) if (ttl_element->group_by_key.size() > pk_columns.size())
throw Exception("TTL Expression GROUP BY key should be a prefix of primary key", ErrorCodes::BAD_TTL_EXPRESSION); throw Exception("TTL Expression GROUP BY key should be a prefix of primary key", ErrorCodes::BAD_TTL_EXPRESSION);
NameSet primary_key_columns_set(pk_columns.begin(), pk_columns.end());
NameSet aggregation_columns_set; NameSet aggregation_columns_set;
NameSet used_primary_key_columns_set;
for (const auto & column : primary_key.expression->getRequiredColumns())
primary_key_columns_set.insert(column);
for (size_t i = 0; i < ttl_element->group_by_key.size(); ++i) for (size_t i = 0; i < ttl_element->group_by_key.size(); ++i)
{ {
@ -194,61 +215,54 @@ TTLDescription TTLDescription::getTTLFromAST(
throw Exception( throw Exception(
"TTL Expression GROUP BY key should be a prefix of primary key", "TTL Expression GROUP BY key should be a prefix of primary key",
ErrorCodes::BAD_TTL_EXPRESSION); ErrorCodes::BAD_TTL_EXPRESSION);
used_primary_key_columns_set.insert(pk_columns[i]);
} }
for (const auto & [name, value] : ttl_element->group_by_aggregations) std::vector<std::pair<String, ASTPtr>> aggregations;
for (const auto & ast : ttl_element->group_by_assignments)
{ {
if (primary_key_columns_set.count(name)) const auto assignment = ast->as<const ASTAssignment &>();
throw Exception( auto expression = assignment.expression();
"Can not set custom aggregation for column in primary key in TTL Expression",
ErrorCodes::BAD_TTL_EXPRESSION);
aggregation_columns_set.insert(name); FindAggregateFunctionVisitor::Data data{false};
FindAggregateFunctionVisitor(data).visit(expression);
if (!data.has_aggregate_function)
throw Exception(ErrorCodes::BAD_TTL_EXPRESSION,
"Invalid expression for assignment of column {}. Should contain an aggregate function", assignment.column_name);
expression = addTypeConversionToAST(std::move(expression), columns.getPhysical(assignment.column_name).type->getName());
aggregations.emplace_back(assignment.column_name, std::move(expression));
aggregation_columns_set.insert(assignment.column_name);
} }
if (aggregation_columns_set.size() != ttl_element->group_by_aggregations.size()) if (aggregation_columns_set.size() != ttl_element->group_by_assignments.size())
throw Exception( throw Exception(
"Multiple aggregations set for one column in TTL Expression", "Multiple aggregations set for one column in TTL Expression",
ErrorCodes::BAD_TTL_EXPRESSION); ErrorCodes::BAD_TTL_EXPRESSION);
result.group_by_keys = Names(pk_columns.begin(), pk_columns.begin() + ttl_element->group_by_key.size()); result.group_by_keys = Names(pk_columns.begin(), pk_columns.begin() + ttl_element->group_by_key.size());
auto aggregations = ttl_element->group_by_aggregations; const auto & primary_key_expressions = primary_key.expression_list_ast->children;
for (size_t i = 0; i < pk_columns.size(); ++i) /// Wrap with 'any' aggregate function primary key columns,
/// which are not in 'GROUP BY' key and was not set explicitly.
/// The separate step, because not all primary key columns are ordinary columns.
for (size_t i = ttl_element->group_by_key.size(); i < primary_key_expressions.size(); ++i)
{ {
ASTPtr value = primary_key.expression_list_ast->children[i]->clone(); if (!aggregation_columns_set.count(pk_columns[i]))
if (i >= ttl_element->group_by_key.size())
{ {
ASTPtr value_max = makeASTFunction("max", value->clone()); ASTPtr expr = makeASTFunction("any", primary_key_expressions[i]->clone());
aggregations.emplace_back(value->getColumnName(), std::move(value_max)); aggregations.emplace_back(pk_columns[i], std::move(expr));
} aggregation_columns_set.insert(pk_columns[i]);
if (value->as<ASTFunction>())
{
auto syntax_result = TreeRewriter(context).analyze(value, columns.getAllPhysical(), {}, {}, true);
auto expr_actions = ExpressionAnalyzer(value, syntax_result, context).getActions(false);
for (const auto & column : expr_actions->getRequiredColumns())
{
if (i < ttl_element->group_by_key.size())
{
ASTPtr expr = makeASTFunction("any", std::make_shared<ASTIdentifier>(column));
aggregations.emplace_back(column, std::move(expr));
}
else
{
ASTPtr expr = makeASTFunction("argMax", std::make_shared<ASTIdentifier>(column), value->clone());
aggregations.emplace_back(column, std::move(expr));
}
}
} }
} }
for (const auto & column : columns.getAllPhysical()) /// Wrap with 'any' aggregate function other columns, which was not set explicitly.
for (const auto & column : columns.getOrdinary())
{ {
if (!primary_key_columns_set.count(column.name) && !aggregation_columns_set.count(column.name)) if (!aggregation_columns_set.count(column.name) && !used_primary_key_columns_set.count(column.name))
{ {
ASTPtr expr = makeASTFunction("any", std::make_shared<ASTIdentifier>(column.name)); ASTPtr expr = makeASTFunction("any", std::make_shared<ASTIdentifier>(column.name));
aggregations.emplace_back(column.name, std::move(expr)); aggregations.emplace_back(column.name, std::move(expr));
@ -280,8 +294,6 @@ TTLDescription TTLDescription::getTTLFromAST(
} }
checkTTLExpression(result.expression, result.result_column); checkTTLExpression(result.expression, result.result_column);
return result; return result;
} }
@ -289,8 +301,10 @@ TTLDescription TTLDescription::getTTLFromAST(
TTLTableDescription::TTLTableDescription(const TTLTableDescription & other) TTLTableDescription::TTLTableDescription(const TTLTableDescription & other)
: definition_ast(other.definition_ast ? other.definition_ast->clone() : nullptr) : definition_ast(other.definition_ast ? other.definition_ast->clone() : nullptr)
, rows_ttl(other.rows_ttl) , rows_ttl(other.rows_ttl)
, rows_where_ttl(other.rows_where_ttl)
, move_ttl(other.move_ttl) , move_ttl(other.move_ttl)
, recompression_ttl(other.recompression_ttl) , recompression_ttl(other.recompression_ttl)
, group_by_ttl(other.group_by_ttl)
{ {
} }
@ -305,8 +319,10 @@ TTLTableDescription & TTLTableDescription::operator=(const TTLTableDescription &
definition_ast.reset(); definition_ast.reset();
rows_ttl = other.rows_ttl; rows_ttl = other.rows_ttl;
rows_where_ttl = other.rows_where_ttl;
move_ttl = other.move_ttl; move_ttl = other.move_ttl;
recompression_ttl = other.recompression_ttl; recompression_ttl = other.recompression_ttl;
group_by_ttl = other.group_by_ttl;
return *this; return *this;
} }
@ -323,21 +339,33 @@ TTLTableDescription TTLTableDescription::getTTLForTableFromAST(
result.definition_ast = definition_ast->clone(); result.definition_ast = definition_ast->clone();
bool seen_delete_ttl = false; bool have_unconditional_delete_ttl = false;
for (const auto & ttl_element_ptr : definition_ast->children) for (const auto & ttl_element_ptr : definition_ast->children)
{ {
auto ttl = TTLDescription::getTTLFromAST(ttl_element_ptr, columns, context, primary_key); auto ttl = TTLDescription::getTTLFromAST(ttl_element_ptr, columns, context, primary_key);
if (ttl.mode == TTLMode::DELETE || ttl.mode == TTLMode::GROUP_BY) if (ttl.mode == TTLMode::DELETE)
{ {
if (seen_delete_ttl) if (!ttl.where_expression)
throw Exception("More than one DELETE TTL expression is not allowed", ErrorCodes::BAD_TTL_EXPRESSION); {
result.rows_ttl = ttl; if (have_unconditional_delete_ttl)
seen_delete_ttl = true; throw Exception("More than one DELETE TTL expression without WHERE expression is not allowed", ErrorCodes::BAD_TTL_EXPRESSION);
have_unconditional_delete_ttl = true;
result.rows_ttl = ttl;
}
else
{
result.rows_where_ttl.emplace_back(std::move(ttl));
}
} }
else if (ttl.mode == TTLMode::RECOMPRESS) else if (ttl.mode == TTLMode::RECOMPRESS)
{ {
result.recompression_ttl.emplace_back(std::move(ttl)); result.recompression_ttl.emplace_back(std::move(ttl));
} }
else if (ttl.mode == TTLMode::GROUP_BY)
{
result.group_by_ttl.emplace_back(std::move(ttl));
}
else else
{ {
result.move_ttl.emplace_back(std::move(ttl)); result.move_ttl.emplace_back(std::move(ttl));

View File

@ -99,14 +99,19 @@ struct TTLTableDescription
/// ^~~~~~~~~~~~~~~definition~~~~~~~~~~~~~~~^ /// ^~~~~~~~~~~~~~~definition~~~~~~~~~~~~~~~^
ASTPtr definition_ast; ASTPtr definition_ast;
/// Rows removing TTL /// Unconditional main removing rows TTL. Can be only one for table.
TTLDescription rows_ttl; TTLDescription rows_ttl;
/// Conditional removing rows TTLs.
TTLDescriptions rows_where_ttl;
/// Moving data TTL (to other disks or volumes) /// Moving data TTL (to other disks or volumes)
TTLDescriptions move_ttl; TTLDescriptions move_ttl;
TTLDescriptions recompression_ttl; TTLDescriptions recompression_ttl;
TTLDescriptions group_by_ttl;
TTLTableDescription() = default; TTLTableDescription() = default;
TTLTableDescription(const TTLTableDescription & other); TTLTableDescription(const TTLTableDescription & other);
TTLTableDescription & operator=(const TTLTableDescription & other); TTLTableDescription & operator=(const TTLTableDescription & other);

View File

@ -9,6 +9,11 @@ cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_zookeeper=True) node1 = cluster.add_instance('node1', with_zookeeper=True)
node2 = cluster.add_instance('node2', with_zookeeper=True) node2 = cluster.add_instance('node2', with_zookeeper=True)
node3 = cluster.add_instance('node3', with_zookeeper=True)
node4 = cluster.add_instance('node4', with_zookeeper=True, image='yandex/clickhouse-server', tag='20.12.4.5', stay_alive=True, with_installed_binary=True)
node5 = cluster.add_instance('node5', with_zookeeper=True, image='yandex/clickhouse-server', tag='20.12.4.5', stay_alive=True, with_installed_binary=True)
node6 = cluster.add_instance('node6', with_zookeeper=True, image='yandex/clickhouse-server', tag='20.12.4.5', stay_alive=True, with_installed_binary=True)
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def started_cluster(): def started_cluster():
@ -329,3 +334,73 @@ def test_ttl_empty_parts(started_cluster):
error_msg = '<Error> default.test_ttl_empty_parts (ReplicatedMergeTreeCleanupThread)' error_msg = '<Error> default.test_ttl_empty_parts (ReplicatedMergeTreeCleanupThread)'
assert not node1.contains_in_log(error_msg) assert not node1.contains_in_log(error_msg)
assert not node2.contains_in_log(error_msg) assert not node2.contains_in_log(error_msg)
@pytest.mark.parametrize(
('node_left', 'node_right', 'num_run'),
[(node1, node2, 0), (node3, node4, 1), (node5, node6, 2)]
)
def test_ttl_compatibility(started_cluster, node_left, node_right, num_run):
drop_table([node_left, node_right], "test_ttl_delete")
drop_table([node_left, node_right], "test_ttl_group_by")
drop_table([node_left, node_right], "test_ttl_where")
for node in [node_left, node_right]:
node.query(
'''
CREATE TABLE test_ttl_delete(date DateTime, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_delete_{suff}', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date)
TTL date + INTERVAL 3 SECOND
'''.format(suff=num_run, replica=node.name))
node.query(
'''
CREATE TABLE test_ttl_group_by(date DateTime, id UInt32, val UInt64)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_group_by_{suff}', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date)
TTL date + INTERVAL 3 SECOND GROUP BY id SET val = sum(val)
'''.format(suff=num_run, replica=node.name))
node.query(
'''
CREATE TABLE test_ttl_where(date DateTime, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_where_{suff}', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date)
TTL date + INTERVAL 3 SECOND DELETE WHERE id % 2 = 1
'''.format(suff=num_run, replica=node.name))
node_left.query("INSERT INTO test_ttl_delete VALUES (now(), 1)")
node_left.query("INSERT INTO test_ttl_delete VALUES (toDateTime('2100-10-11 10:00:00'), 2)")
node_right.query("INSERT INTO test_ttl_delete VALUES (now(), 3)")
node_right.query("INSERT INTO test_ttl_delete VALUES (toDateTime('2100-10-11 10:00:00'), 4)")
node_left.query("INSERT INTO test_ttl_group_by VALUES (now(), 0, 1)")
node_left.query("INSERT INTO test_ttl_group_by VALUES (now(), 0, 2)")
node_right.query("INSERT INTO test_ttl_group_by VALUES (now(), 0, 3)")
node_right.query("INSERT INTO test_ttl_group_by VALUES (now(), 0, 4)")
node_left.query("INSERT INTO test_ttl_where VALUES (now(), 1)")
node_left.query("INSERT INTO test_ttl_where VALUES (now(), 2)")
node_right.query("INSERT INTO test_ttl_where VALUES (now(), 3)")
node_right.query("INSERT INTO test_ttl_where VALUES (now(), 4)")
if node_left.with_installed_binary:
node_left.restart_with_latest_version()
if node_right.with_installed_binary:
node_right.restart_with_latest_version()
time.sleep(5) # Wait for TTL
node_right.query("OPTIMIZE TABLE test_ttl_delete FINAL")
node_right.query("OPTIMIZE TABLE test_ttl_group_by FINAL")
node_right.query("OPTIMIZE TABLE test_ttl_where FINAL")
assert node_left.query("SELECT id FROM test_ttl_delete ORDER BY id") == "2\n4\n"
assert node_right.query("SELECT id FROM test_ttl_delete ORDER BY id") == "2\n4\n"
assert node_left.query("SELECT val FROM test_ttl_group_by ORDER BY id") == "10\n"
assert node_right.query("SELECT val FROM test_ttl_group_by ORDER BY id") == "10\n"
assert node_left.query("SELECT id FROM test_ttl_where ORDER BY id") == "2\n4\n"
assert node_right.query("SELECT id FROM test_ttl_where ORDER BY id") == "2\n4\n"

View File

@ -1,20 +1,26 @@
ttl_01280_1
1 1 0 4 1 1 0 4
1 2 3 7 1 2 3 7
1 3 0 5 1 3 0 5
2 1 0 1 2 1 0 1
2 1 20 1 2 1 20 1
ttl_01280_2
1 1 [0,2,3] 4 1 1 [0,2,3] 4
1 1 [5,4,1] 13 1 1 [5,4,1] 13
1 3 [1,0,1,0] 17 1 3 [1,0,1,0] 17
2 1 [3,1,0,3] 8 2 1 [3,1,0,3] 8
3 1 [2,4,5] 8 3 1 [2,4,5] 8
ttl_01280_3
1 1 0 4 1 1 0 4
1 3 10 6 1 1 10 6
2 1 0 3 2 1 0 3
3 5 8 2 3 1 8 2
ttl_01280_4
1 1 0 4 1 1 0 4
3 3 13 9 10 2 13 9
ttl_01280_5
1 2 7 5 1 2 7 5
2 3 6 5 2 3 6 5
1 2 3 5 ttl_01280_6
2 3 3 5 1 5 3 5
2 10 3 5

View File

@ -14,6 +14,7 @@ function optimize()
done done
} }
echo "ttl_01280_1"
$CLICKHOUSE_CLIENT -n --query " $CLICKHOUSE_CLIENT -n --query "
create table ttl_01280_1 (a Int, b Int, x Int, y Int, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second delete where x % 10 == 0 and y > 5; create table ttl_01280_1 (a Int, b Int, x Int, y Int, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second delete where x % 10 == 0 and y > 5;
insert into ttl_01280_1 values (1, 1, 0, 4, now() + 10); insert into ttl_01280_1 values (1, 1, 0, 4, now() + 10);
@ -30,6 +31,7 @@ $CLICKHOUSE_CLIENT --query "select a, b, x, y from ttl_01280_1 ORDER BY a, b, x,
$CLICKHOUSE_CLIENT --query "drop table if exists ttl_01280_2" $CLICKHOUSE_CLIENT --query "drop table if exists ttl_01280_2"
echo "ttl_01280_2"
$CLICKHOUSE_CLIENT -n --query " $CLICKHOUSE_CLIENT -n --query "
create table ttl_01280_2 (a Int, b Int, x Array(Int32), y Double, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a, b set x = minForEach(x), y = sum(y), d = max(d); create table ttl_01280_2 (a Int, b Int, x Array(Int32), y Double, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a, b set x = minForEach(x), y = sum(y), d = max(d);
insert into ttl_01280_2 values (1, 1, array(0, 2, 3), 4, now() + 10); insert into ttl_01280_2 values (1, 1, array(0, 2, 3), 4, now() + 10);
@ -48,8 +50,9 @@ $CLICKHOUSE_CLIENT --query "select a, b, x, y from ttl_01280_2 ORDER BY a, b, x,
$CLICKHOUSE_CLIENT --query "drop table if exists ttl_01280_3" $CLICKHOUSE_CLIENT --query "drop table if exists ttl_01280_3"
echo "ttl_01280_3"
$CLICKHOUSE_CLIENT -n --query " $CLICKHOUSE_CLIENT -n --query "
create table ttl_01280_3 (a Int, b Int, x Int64, y Int, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a set x = argMax(x, d), y = argMax(y, d), d = max(d); create table ttl_01280_3 (a Int, b Int, x Int64, y Int, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a set b = min(b), x = argMax(x, d), y = argMax(y, d), d = max(d);
insert into ttl_01280_3 values (1, 1, 0, 4, now() + 10); insert into ttl_01280_3 values (1, 1, 0, 4, now() + 10);
insert into ttl_01280_3 values (1, 1, 10, 6, now() + 1); insert into ttl_01280_3 values (1, 1, 10, 6, now() + 1);
insert into ttl_01280_3 values (1, 2, 3, 7, now()); insert into ttl_01280_3 values (1, 2, 3, 7, now());
@ -66,6 +69,7 @@ $CLICKHOUSE_CLIENT --query "select a, b, x, y from ttl_01280_3 ORDER BY a, b, x,
$CLICKHOUSE_CLIENT --query "drop table if exists ttl_01280_4" $CLICKHOUSE_CLIENT --query "drop table if exists ttl_01280_4"
echo "ttl_01280_4"
$CLICKHOUSE_CLIENT -n --query " $CLICKHOUSE_CLIENT -n --query "
create table ttl_01280_4 (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), -(a + b)) ttl d + interval 1 second group by toDate(d) set x = sum(x), y = max(y); create table ttl_01280_4 (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), -(a + b)) ttl d + interval 1 second group by toDate(d) set x = sum(x), y = max(y);
insert into ttl_01280_4 values (1, 1, 0, 4, now() + 10); insert into ttl_01280_4 values (1, 1, 0, 4, now() + 10);
@ -80,7 +84,8 @@ $CLICKHOUSE_CLIENT --query "select a, b, x, y from ttl_01280_4 ORDER BY a, b, x,
$CLICKHOUSE_CLIENT --query "drop table if exists ttl_01280_5" $CLICKHOUSE_CLIENT --query "drop table if exists ttl_01280_5"
$CLICKHOUSE_CLIENT -n --query "create table ttl_01280_5 (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), a, -b) ttl d + interval 1 second group by toDate(d), a set x = sum(x); echo "ttl_01280_5"
$CLICKHOUSE_CLIENT -n --query "create table ttl_01280_5 (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), a, -b) ttl d + interval 1 second group by toDate(d), a set x = sum(x), b = argMax(b, -b);
insert into ttl_01280_5 values (1, 2, 3, 5, now()); insert into ttl_01280_5 values (1, 2, 3, 5, now());
insert into ttl_01280_5 values (2, 10, 1, 5, now()); insert into ttl_01280_5 values (2, 10, 1, 5, now());
insert into ttl_01280_5 values (2, 3, 5, 5, now()); insert into ttl_01280_5 values (2, 3, 5, 5, now());
@ -92,6 +97,7 @@ $CLICKHOUSE_CLIENT --query "select a, b, x, y from ttl_01280_5 ORDER BY a, b, x,
$CLICKHOUSE_CLIENT --query "drop table if exists ttl_01280_6" $CLICKHOUSE_CLIENT --query "drop table if exists ttl_01280_6"
echo "ttl_01280_6"
$CLICKHOUSE_CLIENT -n --query " $CLICKHOUSE_CLIENT -n --query "
create table ttl_01280_6 (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), a, -b) ttl d + interval 1 second group by toDate(d), a; create table ttl_01280_6 (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), a, -b) ttl d + interval 1 second group by toDate(d), a;
insert into ttl_01280_6 values (1, 2, 3, 5, now()); insert into ttl_01280_6 values (1, 2, 3, 5, now());

View File

@ -1,7 +1,4 @@
create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by x set y = max(y); -- { serverError 450} create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by x set y = max(y); -- { serverError 450}
create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by b set y = max(y); -- { serverError 450} create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by b set y = max(y); -- { serverError 450}
create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a, b, x set y = max(y); -- { serverError 450} create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a, b, x set y = max(y); -- { serverError 450}
create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a set b = min(b), y = max(y); -- { serverError 450}
create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a, b set y = max(y), y = max(y); -- { serverError 450} create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a, b set y = max(y), y = max(y); -- { serverError 450}
create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), a) ttl d + interval 1 second group by toDate(d), a set d = min(d), b = max(b); -- { serverError 450}
create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (d, -(a + b)) ttl d + interval 1 second group by d, -(a + b) set a = sum(a), b = min(b); -- { serverError 450}

View File

@ -0,0 +1,4 @@
2020-01-01 00:00:00 3
2020-01-01 00:00:00 2020-01-01 00:00:00 111
1
0

View File

@ -0,0 +1,78 @@
DROP TABLE IF EXISTS derived_metrics_local;
CREATE TABLE derived_metrics_local
(
timestamp DateTime,
bytes UInt64
)
ENGINE=SummingMergeTree()
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (toStartOfHour(timestamp), timestamp)
TTL toStartOfHour(timestamp) + INTERVAL 1 HOUR GROUP BY toStartOfHour(timestamp)
SET bytes=max(bytes);
INSERT INTO derived_metrics_local values('2020-01-01 00:00:00', 1);
INSERT INTO derived_metrics_local values('2020-01-01 00:01:00', 3);
INSERT INTO derived_metrics_local values('2020-01-01 00:02:00', 2);
OPTIMIZE TABLE derived_metrics_local FINAL;
SELECT * FROM derived_metrics_local;
DROP TABLE derived_metrics_local;
CREATE TABLE derived_metrics_local
(
timestamp DateTime,
timestamp_h DateTime materialized toStartOfHour(timestamp),
bytes UInt64
)
ENGINE=SummingMergeTree()
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (timestamp_h, timestamp)
TTL toStartOfHour(timestamp) + INTERVAL 1 HOUR GROUP BY timestamp_h
SET bytes=max(bytes), timestamp = toStartOfHour(any(timestamp));
INSERT INTO derived_metrics_local values('2020-01-01 00:01:00', 111);
INSERT INTO derived_metrics_local values('2020-01-01 00:19:22', 22);
INSERT INTO derived_metrics_local values('2020-01-01 00:59:02', 1);
OPTIMIZE TABLE derived_metrics_local FINAL;
SELECT timestamp, timestamp_h, bytes FROM derived_metrics_local;
DROP TABLE IF EXISTS derived_metrics_local;
CREATE TABLE derived_metrics_local
(
timestamp DateTime,
bytes UInt64 TTL toStartOfHour(timestamp) + INTERVAL 1 HOUR
)
ENGINE=MergeTree()
ORDER BY (toStartOfHour(timestamp), timestamp)
SETTINGS min_bytes_for_wide_part = 0;
INSERT INTO derived_metrics_local values('2020-01-01 00:01:00', 111) ('2020-01-01 00:19:22', 22) ('2100-01-01 00:19:22', 1);
OPTIMIZE TABLE derived_metrics_local FINAL;
SELECT sum(bytes) FROM derived_metrics_local;
DROP TABLE IF EXISTS derived_metrics_local;
CREATE TABLE derived_metrics_local
(
timestamp DateTime,
bytes UInt64
)
ENGINE=MergeTree()
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (toStartOfHour(timestamp), timestamp)
TTL toStartOfHour(timestamp) + INTERVAL 1 HOUR
SETTINGS min_bytes_for_wide_part = 0;
INSERT INTO derived_metrics_local values('2020-01-01 00:01:00', 111);
INSERT INTO derived_metrics_local values('2020-01-01 00:19:22', 22);
INSERT INTO derived_metrics_local values('2020-01-01 00:59:02', 1);
OPTIMIZE TABLE derived_metrics_local FINAL;
SELECT count() FROM derived_metrics_local;
DROP TABLE IF EXISTS derived_metrics_local;

View File

@ -0,0 +1,22 @@
TTL WHERE
1970-10-10 2
1970-10-10 5
1970-10-10 8
2000-10-10 1
2000-10-10 2
2000-10-10 4
2000-10-10 5
2000-10-10 7
2000-10-10 8
TTL GROUP BY
1970-10-01 0 4950
2000-10-01 0 450
2000-10-01 1 460
2000-10-01 2 470
2000-10-01 3 480
2000-10-01 4 490
2000-10-01 5 500
2000-10-01 6 510
2000-10-01 7 520
2000-10-01 8 530
2000-10-01 9 540

View File

@ -0,0 +1,44 @@
SELECT 'TTL WHERE';
DROP TABLE IF EXISTS ttl_where;
CREATE TABLE ttl_where
(
`d` Date,
`i` UInt32
)
ENGINE = MergeTree
ORDER BY tuple()
TTL d + toIntervalYear(10) DELETE WHERE i % 3 = 0,
d + toIntervalYear(40) DELETE WHERE i % 3 = 1;
-- This test will fail at 2040-10-10
INSERT INTO ttl_where SELECT toDate('2000-10-10'), number FROM numbers(10);
INSERT INTO ttl_where SELECT toDate('1970-10-10'), number FROM numbers(10);
OPTIMIZE TABLE ttl_where FINAL;
SELECT * FROM ttl_where ORDER BY d, i;
DROP TABLE ttl_where;
SELECT 'TTL GROUP BY';
DROP TABLE IF EXISTS ttl_group_by;
CREATE TABLE ttl_group_by
(
`d` Date,
`i` UInt32,
`v` UInt64
)
ENGINE = MergeTree
ORDER BY (toStartOfMonth(d), i % 10)
TTL d + toIntervalYear(10) GROUP BY toStartOfMonth(d), i % 10 SET d = any(toStartOfMonth(d)), i = any(i % 10), v = sum(v),
d + toIntervalYear(40) GROUP BY toStartOfMonth(d) SET d = any(toStartOfMonth(d)), v = sum(v);
INSERT INTO ttl_group_by SELECT toDate('2000-10-10'), number, number FROM numbers(100);
INSERT INTO ttl_group_by SELECT toDate('1970-10-10'), number, number FROM numbers(100);
OPTIMIZE TABLE ttl_group_by FINAL;
SELECT * FROM ttl_group_by ORDER BY d, i;
DROP TABLE ttl_group_by;