add query 'ALTER ... MATERIALIZE TTL'

This commit is contained in:
CurtizJ 2020-01-22 16:24:20 +03:00
parent 9fe94ca5f7
commit 14f0b9e137
16 changed files with 157 additions and 1 deletions

View File

@ -23,6 +23,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int ILLEGAL_COLUMN;
extern const int SUPPORT_IS_DISABLED;
extern const int INCORRECT_QUERY;
}
@ -65,7 +66,13 @@ BlockIO InterpreterAlterQuery::execute()
partition_commands.emplace_back(std::move(*partition_command));
}
else if (auto mut_command = MutationCommand::parse(command_ast))
{
if (mut_command->type == MutationCommand::MATERIALIZE_TTL && !table->hasAnyTTL())
throw Exception("Cannot MATERIALIZE TTL as there is no TTL set for table "
+ table->getStorageID().getNameForLogs(), ErrorCodes::INCORRECT_QUERY);
mutation_commands.emplace_back(std::move(*mut_command));
}
else if (auto live_view_command = LiveViewCommand::parse(command_ast))
live_view_commands.emplace_back(std::move(*live_view_command));
else

View File

@ -381,6 +381,24 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
const auto required_columns = syntax_result->requiredSourceColumns();
affected_indices_columns.insert(std::cbegin(required_columns), std::cend(required_columns));
}
else if (command.type == MutationCommand::MATERIALIZE_TTL)
{
if (stages.empty() || !stages.back().column_to_updated.empty())
stages.emplace_back(context);
if (stages.size() == 1) /// First stage only supports filtering and can't update columns.
stages.emplace_back(context);
auto required_columns_for_ttl = storage->getColumnsRequiredForTTL();
for (const auto & column_name : required_columns_for_ttl)
stages.back().column_to_updated.emplace(column_name, std::make_shared<ASTIdentifier>(column_name));
auto updated_columns_by_ttl = storage->getColumnsUpdatedByTTL();
for (const auto & column_name : updated_columns_by_ttl)
{
stages.back().column_to_updated.emplace(column_name, std::make_shared<ASTIdentifier>(column_name));
affected_indices_columns.insert(column_name);
}
}
else
throw Exception("Unknown mutation command type: " + DB::toString<int>(command.type), ErrorCodes::UNKNOWN_MUTATION_COMMAND);
}

View File

@ -261,6 +261,11 @@ void ASTAlterCommand::formatImpl(
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "MODIFY TTL " << (settings.hilite ? hilite_none : "");
ttl->formatImpl(settings, state, frame);
}
else if (type == ASTAlterCommand::MATERIALIZE_TTL)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "MATERIALIZE TTL"
<< (settings.hilite ? hilite_none : "");
}
else if (type == ASTAlterCommand::MODIFY_SETTING)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "MODIFY SETTING " << (settings.hilite ? hilite_none : "");

View File

@ -31,6 +31,7 @@ public:
COMMENT_COLUMN,
MODIFY_ORDER_BY,
MODIFY_TTL,
MATERIALIZE_TTL,
MODIFY_SETTING,
ADD_INDEX,

View File

@ -29,6 +29,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_comment_column("COMMENT COLUMN");
ParserKeyword s_modify_order_by("MODIFY ORDER BY");
ParserKeyword s_modify_ttl("MODIFY TTL");
ParserKeyword s_materialize_ttl("MATERIALIZE TTL");
ParserKeyword s_modify_setting("MODIFY SETTING");
ParserKeyword s_add_index("ADD INDEX");
@ -455,6 +456,10 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
return false;
command->type = ASTAlterCommand::MODIFY_TTL;
}
else if (s_materialize_ttl.ignore(pos, expected))
{
command->type = ASTAlterCommand::MATERIALIZE_TTL;
}
else if (s_modify_setting.ignore(pos, expected))
{
if (!parser_settings.parse(pos, command->settings_changes, expected))

View File

@ -116,6 +116,9 @@ public:
virtual bool hasEvenlyDistributedRead() const { return false; }
/// Returns true if there is set table TTL, any column TTL or any move TTL
virtual bool hasAnyTTL() const { return false; }
/// Optional size information of each physical column.
/// Currently it's only used by the MergeTree family for query optimizations.
using ColumnSizeByName = std::unordered_map<std::string, ColumnSize>;
@ -440,6 +443,12 @@ public:
/// Returns names of primary key + secondary sorting columns
virtual Names getSortingKeyColumns() const { return {}; }
/// Returns columns required for calculation of TTL expression.
virtual Names getColumnsRequiredForTTL() const { return {}; }
/// Returns columns that could be updated by applying TTL rules
virtual Names getColumnsUpdatedByTTL() const { return {}; }
/// Returns storage policy if storage supports it
virtual StoragePolicyPtr getStoragePolicy() const { return {}; }

View File

@ -3905,4 +3905,36 @@ bool MergeTreeData::moveParts(CurrentlyMovingPartsTagger && moving_tagger)
return true;
}
Names MergeTreeData::getColumnsRequiredForTTL() const
{
NameSet res;
auto add_columns_to_set = [&](const auto & expression)
{
auto columns_vec = expression->getRequiredColumns();
res.insert(columns_vec.begin(), columns_vec.end());
};
if (hasTableTTL())
add_columns_to_set(ttl_table_entry.expression);
for (const auto & [_, entry] : column_ttl_entries_by_name)
add_columns_to_set(entry.expression);
for (const auto & entry : move_ttl_entries)
add_columns_to_set(entry.expression);
return Names(res.begin(), res.end());
}
Names MergeTreeData::getColumnsUpdatedByTTL() const
{
if (hasTableTTL())
return getColumns().getAllPhysical().getNames();
Names res;
for (const auto & [name, _] : column_ttl_entries_by_name)
res.push_back(name);
return res;
}
}

View File

@ -354,6 +354,8 @@ public:
Names getColumnsRequiredForSampling() const override { return columns_required_for_sampling; }
Names getColumnsRequiredForFinal() const override { return sorting_key_expr->getRequiredColumns(); }
Names getSortingKeyColumns() const override { return sorting_key_columns; }
Names getColumnsRequiredForTTL() const override;
Names getColumnsUpdatedByTTL() const override;
StoragePolicyPtr getStoragePolicy() const override { return storage_policy; }
@ -580,6 +582,7 @@ public:
bool hasAnyColumnTTL() const { return !column_ttl_entries_by_name.empty(); }
bool hasAnyMoveTTL() const { return !move_ttl_entries.empty(); }
bool hasRowsTTL() const { return !rows_ttl_entry.isEmpty(); }
bool hasAnyTTL() const override { return hasRowsTTL() || hasAnyMoveTTL() || hasAnyColumnTTL(); }
/// Check that the part is not broken and calculate the checksums for it if they are not present.
MutableDataPartPtr loadPartAndFixMetadata(const DiskPtr & disk, const String & relative_path);

View File

@ -936,6 +936,16 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
throw Exception("Trying to mutate " + toString(future_part.parts.size()) + " parts, not one. "
"This is a bug.", ErrorCodes::LOGICAL_ERROR);
bool need_remove_expired_values = false;
for (const auto & command : commands)
{
if (command.type == MutationCommand::MATERIALIZE_TTL)
{
need_remove_expired_values = true;
break;
}
}
CurrentMetrics::Increment num_mutations{CurrentMetrics::PartMutation};
const auto & source_part = future_part.parts[0];
auto storage_from_source_part = StorageFromMergeTreeDataPart::create(source_part);
@ -966,6 +976,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(
data, space_reservation->getDisk(), future_part.name, future_part.part_info);
new_data_part->relative_path = "tmp_mut_" + future_part.name;
new_data_part->is_temp = true;
new_data_part->ttl_infos = source_part->ttl_infos;
@ -1004,6 +1015,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
in = std::make_shared<MaterializingBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(in, data.primary_key_and_skip_indices_expr));
if (need_remove_expired_values)
in = std::make_shared<TTLBlockInputStream>(in, data, new_data_part, time(nullptr), true);
MergeTreeDataPart::MinMaxIndex minmax_idx;
MergedBlockOutputStream out(data, new_part_tmp_path, all_columns, compression_codec);
@ -1085,12 +1099,16 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
IDataType::SubstreamPath stream_path;
entry.type->enumerateStreams(callback, stream_path);
}
for (const auto & index : indices_to_recalc)
{
files_to_skip.insert(index->getFileName() + ".idx");
files_to_skip.insert(index->getFileName() + mrk_extension);
}
if (!need_remove_expired_values)
files_to_skip.insert("ttl.txt");
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator dir_it(source_part->getFullPath()); dir_it != dir_end; ++dir_it)
{
@ -1105,6 +1123,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
merge_entry->columns_written = all_columns.size() - updated_header.columns();
if (need_remove_expired_values)
in = std::make_shared<TTLBlockInputStream>(in, data, new_data_part, time(nullptr), true);
IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets;
MergedColumnOnlyOutputStream out(
data,
@ -1136,6 +1157,17 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
new_data_part->checksums = source_part->checksums;
new_data_part->checksums.add(std::move(changed_checksums));
if (!new_data_part->ttl_infos.empty())
{
/// Write a file with ttl infos in json format.
WriteBufferFromFile out_ttl(new_part_tmp_path + "ttl.txt", 4096);
HashingWriteBuffer out_hashing(out_ttl);
new_data_part->ttl_infos.write(out_hashing);
new_data_part->checksums.files["ttl.txt"].file_size = out_hashing.count();
new_data_part->checksums.files["ttl.txt"].file_hash = out_hashing.getHash();
}
{
/// Write file with checksums.
WriteBufferFromFile out_checksums(new_part_tmp_path + "checksums.txt", 4096);

View File

@ -47,6 +47,10 @@ public:
return part->storage.mayBenefitFromIndexForIn(left_in_operand, query_context);
}
Names getColumnsRequiredForTTL() const override { return part->storage.getColumnsRequiredForTTL(); }
Names getColumnsUpdatedByTTL() const override { return part->storage.getColumnsUpdatedByTTL(); }
bool hasAnyTTL() const override { return part->storage.hasAnyTTL(); }
protected:
StorageFromMergeTreeDataPart(const MergeTreeData::DataPartPtr & part_)
: IStorage(getIDFromPart(part_), part_->storage.getVirtuals())

View File

@ -55,6 +55,13 @@ std::optional<MutationCommand> MutationCommand::parse(ASTAlterCommand * command)
res.index_name = command->index->as<ASTIdentifier &>().name;
return res;
}
else if (command->type == ASTAlterCommand::MATERIALIZE_TTL)
{
MutationCommand res;
res.ast = command->ptr();
res.type = MATERIALIZE_TTL;
return res;
}
else
return {};
}

View File

@ -25,7 +25,8 @@ struct MutationCommand
EMPTY, /// Not used.
DELETE,
UPDATE,
MATERIALIZE_INDEX
MATERIALIZE_INDEX,
MATERIALIZE_TTL
};
Type type = EMPTY;

View File

@ -778,6 +778,8 @@ bool StorageMergeTree::tryMutatePart()
tagger->reserved_space, table_lock_holder);
renameTempPartAndReplace(new_part);
removeEmptyColumnsFromPart(new_part);
tagger->is_successful = true;
write_part_log({});

View File

@ -1238,6 +1238,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
{
new_part = merger_mutator.mutatePartToTemporaryPart(future_mutated_part, commands, *merge_entry, global_context, reserved_space, table_lock);
renameTempPartAndReplace(new_part, nullptr, &transaction);
removeEmptyColumnsFromPart(new_part);
try
{

View File

@ -0,0 +1,3 @@
Cannot MATERIALIZE TTL
2100-10-10 3
2100-10-10 4

View File

@ -0,0 +1,26 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
. $CURDIR/mergetree_mutations.lib
${CLICKHOUSE_CLIENT} -q "drop table if exists ttl;"
${CLICKHOUSE_CLIENT} -q "create table ttl (d Date, a Int) engine = MergeTree order by a partition by toDayOfMonth(d);"
${CLICKHOUSE_CLIENT} -q "insert into ttl values (toDateTime('2000-10-10 00:00:00'), 1);"
${CLICKHOUSE_CLIENT} -q "insert into ttl values (toDateTime('2000-10-10 00:00:00'), 2);"
${CLICKHOUSE_CLIENT} -q "insert into ttl values (toDateTime('2100-10-10 00:00:00'), 3);"
${CLICKHOUSE_CLIENT} -q "insert into ttl values (toDateTime('2100-10-10 00:00:00'), 4);"
${CLICKHOUSE_CLIENT} -q "alter table ttl materialize ttl" --server_logs_file=/dev/null 2>&1 | grep -o "Cannot MATERIALIZE TTL"
${CLICKHOUSE_CLIENT} -q "alter table ttl modify ttl d + interval 1 day"
${CLICKHOUSE_CLIENT} -q "alter table ttl materialize ttl"
wait_for_mutation "ttl" "mutation_5.txt"
${CLICKHOUSE_CLIENT} -q "select * from ttl order by a"
${CLICKHOUSE_CLIENT} -q "drop table if exists ttl"