mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
TTL in storage in memory metadata
This commit is contained in:
parent
7064a366e2
commit
ed8f3b2fc4
@ -20,10 +20,12 @@ namespace ErrorCodes
|
||||
TTLBlockInputStream::TTLBlockInputStream(
|
||||
const BlockInputStreamPtr & input_,
|
||||
const MergeTreeData & storage_,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
const MergeTreeData::MutableDataPartPtr & data_part_,
|
||||
time_t current_time_,
|
||||
bool force_)
|
||||
: storage(storage_)
|
||||
, metadata_snapshot(metadata_snapshot_)
|
||||
, data_part(data_part_)
|
||||
, current_time(current_time_)
|
||||
, force(force_)
|
||||
@ -34,11 +36,11 @@ TTLBlockInputStream::TTLBlockInputStream(
|
||||
children.push_back(input_);
|
||||
header = children.at(0)->getHeader();
|
||||
|
||||
const auto & storage_columns = storage.getColumns();
|
||||
const auto & storage_columns = metadata_snapshot->getColumns();
|
||||
const auto & column_defaults = storage_columns.getDefaults();
|
||||
|
||||
ASTPtr default_expr_list = std::make_shared<ASTExpressionList>();
|
||||
for (const auto & [name, _] : storage.getColumnTTLs())
|
||||
for (const auto & [name, _] : metadata_snapshot->getColumnTTLs())
|
||||
{
|
||||
auto it = column_defaults.find(name);
|
||||
if (it != column_defaults.end())
|
||||
@ -65,13 +67,12 @@ TTLBlockInputStream::TTLBlockInputStream(
|
||||
|
||||
if (!default_expr_list->children.empty())
|
||||
{
|
||||
auto syntax_result = SyntaxAnalyzer(storage.global_context).analyze(
|
||||
default_expr_list, storage.getColumns().getAllPhysical());
|
||||
auto syntax_result = SyntaxAnalyzer(storage.global_context).analyze(default_expr_list, metadata_snapshot->getColumns().getAllPhysical());
|
||||
defaults_expression = ExpressionAnalyzer{default_expr_list, syntax_result, storage.global_context}.getActions(true);
|
||||
}
|
||||
|
||||
auto storage_rows_ttl = storage.getRowsTTL();
|
||||
if (storage.hasRowsTTL() && storage_rows_ttl.mode == TTLMode::GROUP_BY)
|
||||
auto storage_rows_ttl = metadata_snapshot->getRowsTTL();
|
||||
if (metadata_snapshot->hasRowsTTL() && storage_rows_ttl.mode == TTLMode::GROUP_BY)
|
||||
{
|
||||
current_key_value.resize(storage_rows_ttl.group_by_keys.size());
|
||||
|
||||
@ -106,14 +107,15 @@ bool TTLBlockInputStream::isTTLExpired(time_t ttl) const
|
||||
Block TTLBlockInputStream::readImpl()
|
||||
{
|
||||
/// Skip all data if table ttl is expired for part
|
||||
auto storage_rows_ttl = storage.getRowsTTL();
|
||||
if (storage.hasRowsTTL() && !storage_rows_ttl.where_expression &&
|
||||
storage_rows_ttl.mode != TTLMode::GROUP_BY && isTTLExpired(old_ttl_infos.table_ttl.max))
|
||||
auto storage_rows_ttl = metadata_snapshot->getRowsTTL();
|
||||
if (metadata_snapshot->hasRowsTTL() && !storage_rows_ttl.where_expression && storage_rows_ttl.mode != TTLMode::GROUP_BY
|
||||
&& isTTLExpired(old_ttl_infos.table_ttl.max))
|
||||
{
|
||||
rows_removed = data_part->rows_count;
|
||||
return {};
|
||||
}
|
||||
|
||||
|
||||
Block block = children.at(0)->read();
|
||||
if (!block)
|
||||
{
|
||||
@ -127,7 +129,7 @@ Block TTLBlockInputStream::readImpl()
|
||||
return block;
|
||||
}
|
||||
|
||||
if (storage.hasRowsTTL() && (force || isTTLExpired(old_ttl_infos.table_ttl.min)))
|
||||
if (metadata_snapshot->hasRowsTTL() && (force || isTTLExpired(old_ttl_infos.table_ttl.min)))
|
||||
removeRowsWithExpiredTableTTL(block);
|
||||
|
||||
removeValuesWithExpiredColumnTTL(block);
|
||||
@ -153,7 +155,7 @@ void TTLBlockInputStream::readSuffixImpl()
|
||||
|
||||
void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block)
|
||||
{
|
||||
auto rows_ttl = storage.getRowsTTL();
|
||||
auto rows_ttl = metadata_snapshot->getRowsTTL();
|
||||
|
||||
rows_ttl.expression->execute(block);
|
||||
if (rows_ttl.where_expression)
|
||||
@ -201,7 +203,7 @@ void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block)
|
||||
size_t rows_aggregated = 0;
|
||||
size_t current_key_start = 0;
|
||||
size_t rows_with_current_key = 0;
|
||||
auto storage_rows_ttl = storage.getRowsTTL();
|
||||
auto storage_rows_ttl = metadata_snapshot->getRowsTTL();
|
||||
for (size_t i = 0; i < block.rows(); ++i)
|
||||
{
|
||||
UInt32 cur_ttl = getTimestampByIndex(ttl_column, i);
|
||||
@ -278,7 +280,7 @@ void TTLBlockInputStream::finalizeAggregates(MutableColumns & result_columns)
|
||||
if (!agg_result.empty())
|
||||
{
|
||||
auto aggregated_res = aggregator->convertToBlocks(agg_result, true, 1);
|
||||
auto storage_rows_ttl = storage.getRowsTTL();
|
||||
auto storage_rows_ttl = metadata_snapshot->getRowsTTL();
|
||||
for (auto & agg_block : aggregated_res)
|
||||
{
|
||||
for (const auto & it : storage_rows_ttl.set_parts)
|
||||
@ -310,7 +312,7 @@ void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block)
|
||||
}
|
||||
|
||||
std::vector<String> columns_to_remove;
|
||||
for (const auto & [name, ttl_entry] : storage.getColumnTTLs())
|
||||
for (const auto & [name, ttl_entry] : metadata_snapshot->getColumnTTLs())
|
||||
{
|
||||
/// If we read not all table columns. E.g. while mutation.
|
||||
if (!block.has(name))
|
||||
@ -371,7 +373,7 @@ void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block)
|
||||
void TTLBlockInputStream::updateMovesTTL(Block & block)
|
||||
{
|
||||
std::vector<String> columns_to_remove;
|
||||
for (const auto & ttl_entry : storage.getMoveTTLs())
|
||||
for (const auto & ttl_entry : metadata_snapshot->getMoveTTLs())
|
||||
{
|
||||
auto & new_ttl_info = new_ttl_infos.moves_ttl[ttl_entry.result_column];
|
||||
|
||||
|
@ -16,6 +16,7 @@ public:
|
||||
TTLBlockInputStream(
|
||||
const BlockInputStreamPtr & input_,
|
||||
const MergeTreeData & storage_,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
const MergeTreeData::MutableDataPartPtr & data_part_,
|
||||
time_t current_time,
|
||||
bool force_
|
||||
@ -33,6 +34,7 @@ protected:
|
||||
|
||||
private:
|
||||
const MergeTreeData & storage;
|
||||
StorageMetadataPtr metadata_snapshot;
|
||||
|
||||
/// ttl_infos and empty_columns are updating while reading
|
||||
const MergeTreeData::MutableDataPartPtr & data_part;
|
||||
|
@ -69,7 +69,7 @@ BlockIO InterpreterAlterQuery::execute()
|
||||
}
|
||||
else if (auto mut_command = MutationCommand::parse(command_ast))
|
||||
{
|
||||
if (mut_command->type == MutationCommand::MATERIALIZE_TTL && !table->hasAnyTTL())
|
||||
if (mut_command->type == MutationCommand::MATERIALIZE_TTL && !metadata_snapshot->hasAnyTTL())
|
||||
throw Exception("Cannot MATERIALIZE TTL as there is no TTL set for table "
|
||||
+ table->getStorageID().getNameForLogs(), ErrorCodes::INCORRECT_QUERY);
|
||||
|
||||
|
@ -26,7 +26,8 @@ BlockIO InterpreterOptimizeQuery::execute()
|
||||
|
||||
auto table_id = context.resolveStorageID(ast, Context::ResolveOrdinary);
|
||||
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, context);
|
||||
table->optimize(query_ptr, ast.partition, ast.final, ast.deduplicate, context);
|
||||
auto metadata_snapshot = table->getInMemoryMetadataPtr();
|
||||
table->optimize(query_ptr, metadata_snapshot, ast.partition, ast.final, ast.deduplicate, context);
|
||||
return {};
|
||||
}
|
||||
|
||||
|
@ -411,7 +411,7 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
|
||||
}
|
||||
else if (command.type == MutationCommand::MATERIALIZE_TTL)
|
||||
{
|
||||
if (storage->hasRowsTTL())
|
||||
if (metadata_snapshot->hasRowsTTL())
|
||||
{
|
||||
for (const auto & column : all_columns)
|
||||
dependencies.emplace(column.name, ColumnDependency::TTL_TARGET);
|
||||
|
@ -319,53 +319,6 @@ NamesAndTypesList IStorage::getVirtuals() const
|
||||
return {};
|
||||
}
|
||||
|
||||
TTLTableDescription IStorage::getTableTTLs() const
|
||||
{
|
||||
std::lock_guard lock(ttl_mutex);
|
||||
return metadata->table_ttl;
|
||||
}
|
||||
|
||||
bool IStorage::hasAnyTableTTL() const
|
||||
{
|
||||
return hasAnyMoveTTL() || hasRowsTTL();
|
||||
}
|
||||
|
||||
TTLColumnsDescription IStorage::getColumnTTLs() const
|
||||
{
|
||||
std::lock_guard lock(ttl_mutex);
|
||||
return metadata->column_ttls_by_name;
|
||||
}
|
||||
|
||||
bool IStorage::hasAnyColumnTTL() const
|
||||
{
|
||||
std::lock_guard lock(ttl_mutex);
|
||||
return !metadata->column_ttls_by_name.empty();
|
||||
}
|
||||
|
||||
TTLDescription IStorage::getRowsTTL() const
|
||||
{
|
||||
std::lock_guard lock(ttl_mutex);
|
||||
return metadata->table_ttl.rows_ttl;
|
||||
}
|
||||
|
||||
bool IStorage::hasRowsTTL() const
|
||||
{
|
||||
std::lock_guard lock(ttl_mutex);
|
||||
return metadata->table_ttl.rows_ttl.expression != nullptr;
|
||||
}
|
||||
|
||||
TTLDescriptions IStorage::getMoveTTLs() const
|
||||
{
|
||||
std::lock_guard lock(ttl_mutex);
|
||||
return metadata->table_ttl.move_ttl;
|
||||
}
|
||||
|
||||
bool IStorage::hasAnyMoveTTL() const
|
||||
{
|
||||
std::lock_guard lock(ttl_mutex);
|
||||
return !metadata->table_ttl.move_ttl.empty();
|
||||
}
|
||||
|
||||
ASTPtr IStorage::getSettingsChanges() const
|
||||
{
|
||||
if (metadata->settings_changes)
|
||||
|
@ -129,8 +129,6 @@ public:
|
||||
/// Example is StorageSystemNumbers.
|
||||
virtual bool hasEvenlyDistributedRead() const { return false; }
|
||||
|
||||
/// Returns true if there is set table TTL, any column TTL or any move TTL.
|
||||
bool hasAnyTTL() const { return hasAnyColumnTTL() || hasAnyTableTTL(); }
|
||||
|
||||
/// Optional size information of each physical column.
|
||||
/// Currently it's only used by the MergeTree family for query optimizations.
|
||||
@ -362,7 +360,13 @@ public:
|
||||
/** Perform any background work. For example, combining parts in a MergeTree type table.
|
||||
* Returns whether any work has been done.
|
||||
*/
|
||||
virtual bool optimize(const ASTPtr & /*query*/, const ASTPtr & /*partition*/, bool /*final*/, bool /*deduplicate*/, const Context & /*context*/)
|
||||
virtual bool optimize(
|
||||
const ASTPtr & /*query*/,
|
||||
const StorageMetadataPtr & /*metadata_snapshot*/,
|
||||
const ASTPtr & /*partition*/,
|
||||
bool /*final*/,
|
||||
bool /*deduplicate*/,
|
||||
const Context & /*context*/)
|
||||
{
|
||||
throw Exception("Method optimize is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
@ -430,23 +434,6 @@ public:
|
||||
/// Returns storage policy if storage supports it.
|
||||
virtual StoragePolicyPtr getStoragePolicy() const { return {}; }
|
||||
|
||||
/// Common tables TTLs (for rows and moves).
|
||||
TTLTableDescription getTableTTLs() const;
|
||||
bool hasAnyTableTTL() const;
|
||||
|
||||
/// Separate TTLs for columns.
|
||||
TTLColumnsDescription getColumnTTLs() const;
|
||||
bool hasAnyColumnTTL() const;
|
||||
|
||||
/// Just wrapper for table TTLs, return rows part of table TTLs.
|
||||
TTLDescription getRowsTTL() const;
|
||||
bool hasRowsTTL() const;
|
||||
|
||||
/// Just wrapper for table TTLs, return moves (to disks or volumes) parts of
|
||||
/// table TTL.
|
||||
TTLDescriptions getMoveTTLs() const;
|
||||
bool hasAnyMoveTTL() const;
|
||||
|
||||
/// If it is possible to quickly determine exact number of rows in the table at this moment of time, then return it.
|
||||
/// Used for:
|
||||
/// - Simple count() opimization
|
||||
|
@ -2801,8 +2801,9 @@ MergeTreeData::selectTTLEntryForTTLInfos(const IMergeTreeDataPart::TTLInfos & tt
|
||||
{
|
||||
time_t max_max_ttl = 0;
|
||||
TTLDescriptions::const_iterator best_entry_it;
|
||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||
|
||||
const auto & move_ttl_entries = getMoveTTLs();
|
||||
const auto & move_ttl_entries = metadata_snapshot->getMoveTTLs();
|
||||
for (auto ttl_entry_it = move_ttl_entries.begin(); ttl_entry_it != move_ttl_entries.end(); ++ttl_entry_it)
|
||||
{
|
||||
auto ttl_info_it = ttl_infos.moves_ttl.find(ttl_entry_it->result_column);
|
||||
@ -3235,11 +3236,12 @@ bool MergeTreeData::selectPartsAndMove()
|
||||
bool MergeTreeData::areBackgroundMovesNeeded() const
|
||||
{
|
||||
auto policy = getStoragePolicy();
|
||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||
|
||||
if (policy->getVolumes().size() > 1)
|
||||
return true;
|
||||
|
||||
return policy->getVolumes().size() == 1 && policy->getVolumes()[0]->getDisks().size() > 1 && hasAnyMoveTTL();
|
||||
return policy->getVolumes().size() == 1 && policy->getVolumes()[0]->getDisks().size() > 1 && metadata_snapshot->hasAnyMoveTTL();
|
||||
}
|
||||
|
||||
bool MergeTreeData::movePartsToSpace(const DataPartsVector & parts, SpacePtr space)
|
||||
|
@ -607,14 +607,18 @@ public:
|
||||
static ReservationPtr tryReserveSpace(UInt64 expected_size, SpacePtr space);
|
||||
|
||||
/// Reserves space at least 1MB preferring best destination according to `ttl_infos`.
|
||||
ReservationPtr reserveSpacePreferringTTLRules(UInt64 expected_size,
|
||||
const IMergeTreeDataPart::TTLInfos & ttl_infos,
|
||||
time_t time_of_move,
|
||||
size_t min_volume_index = 0) const;
|
||||
ReservationPtr tryReserveSpacePreferringTTLRules(UInt64 expected_size,
|
||||
const IMergeTreeDataPart::TTLInfos & ttl_infos,
|
||||
time_t time_of_move,
|
||||
size_t min_volume_index = 0) const;
|
||||
ReservationPtr reserveSpacePreferringTTLRules(
|
||||
UInt64 expected_size,
|
||||
const IMergeTreeDataPart::TTLInfos & ttl_infos,
|
||||
time_t time_of_move,
|
||||
size_t min_volume_index = 0) const;
|
||||
|
||||
ReservationPtr tryReserveSpacePreferringTTLRules(
|
||||
UInt64 expected_size,
|
||||
const IMergeTreeDataPart::TTLInfos & ttl_infos,
|
||||
time_t time_of_move,
|
||||
size_t min_volume_index = 0) const;
|
||||
|
||||
/// Choose disk with max available free space
|
||||
/// Reserves 0 bytes
|
||||
ReservationPtr makeEmptyReservationOnLargestDisk() { return getStoragePolicy()->makeEmptyReservationOnLargestDisk(); }
|
||||
|
@ -801,7 +801,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
|
||||
merged_stream = std::make_shared<DistinctSortedBlockInputStream>(merged_stream, SizeLimits(), 0 /*limit_hint*/, Names());
|
||||
|
||||
if (need_remove_expired_values)
|
||||
merged_stream = std::make_shared<TTLBlockInputStream>(merged_stream, data, new_data_part, time_of_merge, force_ttl);
|
||||
merged_stream = std::make_shared<TTLBlockInputStream>(merged_stream, data, metadata_snapshot, new_data_part, time_of_merge, force_ttl);
|
||||
|
||||
|
||||
if (metadata_snapshot->hasSecondaryIndices())
|
||||
@ -1576,7 +1576,7 @@ std::set<MergeTreeIndexPtr> MergeTreeDataMergerMutator::getIndicesToRecalculate(
|
||||
|
||||
bool MergeTreeDataMergerMutator::shouldExecuteTTL(const StorageMetadataPtr & metadata_snapshot, const Names & columns, const MutationCommands & commands) const
|
||||
{
|
||||
if (!data.hasAnyTTL())
|
||||
if (!metadata_snapshot->hasAnyTTL())
|
||||
return false;
|
||||
|
||||
for (const auto & command : commands)
|
||||
@ -1609,7 +1609,7 @@ void MergeTreeDataMergerMutator::mutateAllPartColumns(
|
||||
std::make_shared<ExpressionBlockInputStream>(mutating_stream, data.getPrimaryKeyAndSkipIndicesExpression(metadata_snapshot)));
|
||||
|
||||
if (need_remove_expired_values)
|
||||
mutating_stream = std::make_shared<TTLBlockInputStream>(mutating_stream, data, new_data_part, time_of_mutation, true);
|
||||
mutating_stream = std::make_shared<TTLBlockInputStream>(mutating_stream, data, metadata_snapshot, new_data_part, time_of_mutation, true);
|
||||
|
||||
IMergeTreeDataPart::MinMaxIndex minmax_idx;
|
||||
|
||||
@ -1656,7 +1656,7 @@ void MergeTreeDataMergerMutator::mutateSomePartColumns(
|
||||
throw Exception("Cannot mutate part columns with uninitialized mutations stream. It's a bug", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
if (need_remove_expired_values)
|
||||
mutating_stream = std::make_shared<TTLBlockInputStream>(mutating_stream, data, new_data_part, time_of_mutation, true);
|
||||
mutating_stream = std::make_shared<TTLBlockInputStream>(mutating_stream, data, metadata_snapshot, new_data_part, time_of_mutation, true);
|
||||
|
||||
IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets;
|
||||
MergedColumnOnlyOutputStream out(
|
||||
|
@ -230,11 +230,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
|
||||
size_t expected_size = block.bytes();
|
||||
|
||||
DB::IMergeTreeDataPart::TTLInfos move_ttl_infos;
|
||||
const auto & move_ttl_entries = data.getMoveTTLs();
|
||||
const auto & move_ttl_entries = metadata_snapshot->getMoveTTLs();
|
||||
for (const auto & ttl_entry : move_ttl_entries)
|
||||
updateTTL(ttl_entry, move_ttl_infos, move_ttl_infos.moves_ttl[ttl_entry.result_column], block, false);
|
||||
|
||||
NamesAndTypesList columns = data.getColumns().getAllPhysical().filter(block.getNames());
|
||||
NamesAndTypesList columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames());
|
||||
ReservationPtr reservation = data.reserveSpacePreferringTTLRules(expected_size, move_ttl_infos, time(nullptr));
|
||||
VolumePtr volume = data.getStoragePolicy()->getVolume(0);
|
||||
|
||||
@ -289,10 +289,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
|
||||
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocksAlreadySorted);
|
||||
}
|
||||
|
||||
if (data.hasRowsTTL())
|
||||
updateTTL(data.getRowsTTL(), new_data_part->ttl_infos, new_data_part->ttl_infos.table_ttl, block, true);
|
||||
if (metadata_snapshot->hasRowsTTL())
|
||||
updateTTL(metadata_snapshot->getRowsTTL(), new_data_part->ttl_infos, new_data_part->ttl_infos.table_ttl, block, true);
|
||||
|
||||
for (const auto & [name, ttl_entry] : data.getColumnTTLs())
|
||||
for (const auto & [name, ttl_entry] : metadata_snapshot->getColumnTTLs())
|
||||
updateTTL(ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.columns_ttl[name], block, true);
|
||||
|
||||
new_data_part->ttl_infos.update(move_ttl_infos);
|
||||
|
@ -484,7 +484,7 @@ void StorageBuffer::shutdown()
|
||||
|
||||
try
|
||||
{
|
||||
optimize(nullptr /*query*/, {} /*partition*/, false /*final*/, false /*deduplicate*/, global_context);
|
||||
optimize(nullptr /*query*/, getInMemoryMetadataPtr(), {} /*partition*/, false /*final*/, false /*deduplicate*/, global_context);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -503,7 +503,13 @@ void StorageBuffer::shutdown()
|
||||
*
|
||||
* This kind of race condition make very hard to implement proper tests.
|
||||
*/
|
||||
bool StorageBuffer::optimize(const ASTPtr & /*query*/, const ASTPtr & partition, bool final, bool deduplicate, const Context & /*context*/)
|
||||
bool StorageBuffer::optimize(
|
||||
const ASTPtr & /*query*/,
|
||||
const StorageMetadataPtr & /*metadata_snapshot*/,
|
||||
const ASTPtr & partition,
|
||||
bool final,
|
||||
bool deduplicate,
|
||||
const Context & /*context*/)
|
||||
{
|
||||
if (partition)
|
||||
throw Exception("Partition cannot be specified when optimizing table of type Buffer", ErrorCodes::NOT_IMPLEMENTED);
|
||||
@ -793,11 +799,12 @@ void StorageBuffer::alter(const AlterCommands & params, const Context & context,
|
||||
|
||||
auto table_id = getStorageID();
|
||||
checkAlterIsPossible(params, context.getSettingsRef());
|
||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||
|
||||
/// So that no blocks of the old structure remain.
|
||||
optimize({} /*query*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/, context);
|
||||
optimize({} /*query*/, metadata_snapshot, {} /*partition_id*/, false /*final*/, false /*deduplicate*/, context);
|
||||
|
||||
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
|
||||
StorageInMemoryMetadata new_metadata = *metadata_snapshot;
|
||||
params.apply(new_metadata, context);
|
||||
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(context, table_id, new_metadata);
|
||||
setInMemoryMetadata(new_metadata);
|
||||
|
@ -69,7 +69,7 @@ public:
|
||||
void startup() override;
|
||||
/// Flush all buffers into the subordinate table and stop background thread.
|
||||
void shutdown() override;
|
||||
bool optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & context) override;
|
||||
bool optimize(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, const ASTPtr & partition, bool final, bool deduplicate, const Context & context) override;
|
||||
|
||||
bool supportsSampling() const override { return true; }
|
||||
bool supportsPrewhere() const override
|
||||
|
@ -86,6 +86,9 @@ struct StorageInMemoryMetadata
|
||||
|
||||
const ConstraintsDescription & getConstraints() const;
|
||||
|
||||
/// Returns true if there is set table TTL, any column TTL or any move TTL.
|
||||
bool hasAnyTTL() const { return hasAnyColumnTTL() || hasAnyTableTTL(); }
|
||||
|
||||
/// Common tables TTLs (for rows and moves).
|
||||
TTLTableDescription getTableTTLs() const;
|
||||
bool hasAnyTableTTL() const;
|
||||
|
@ -187,10 +187,18 @@ void StorageMaterializedView::checkStatementCanBeForwarded() const
|
||||
+ "Execute the statement directly on it.", ErrorCodes::INCORRECT_QUERY);
|
||||
}
|
||||
|
||||
bool StorageMaterializedView::optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & context)
|
||||
bool StorageMaterializedView::optimize(
|
||||
const ASTPtr & query,
|
||||
const StorageMetadataPtr & /*metadata_snapshot*/,
|
||||
const ASTPtr & partition,
|
||||
bool final,
|
||||
bool deduplicate,
|
||||
const Context & context)
|
||||
{
|
||||
checkStatementCanBeForwarded();
|
||||
return getTargetTable()->optimize(query, partition, final, deduplicate, context);
|
||||
auto storage_ptr = getTargetTable();
|
||||
auto metadata_snapshot = storage_ptr->getInMemoryMetadataPtr();
|
||||
return getTargetTable()->optimize(query, metadata_snapshot, partition, final, deduplicate, context);
|
||||
}
|
||||
|
||||
void StorageMaterializedView::alter(
|
||||
|
@ -39,7 +39,13 @@ public:
|
||||
|
||||
void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override;
|
||||
|
||||
bool optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & context) override;
|
||||
bool optimize(
|
||||
const ASTPtr & query,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const ASTPtr & partition,
|
||||
bool final,
|
||||
bool deduplicate,
|
||||
const Context & context) override;
|
||||
|
||||
void alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
|
||||
|
||||
|
@ -693,7 +693,7 @@ bool StorageMergeTree::merge(
|
||||
{
|
||||
/// Force filter by TTL in 'OPTIMIZE ... FINAL' query to remove expired values from old parts
|
||||
/// without TTL infos or with outdated TTL infos, e.g. after 'ALTER ... MODIFY TTL' query.
|
||||
bool force_ttl = (final && hasAnyTTL());
|
||||
bool force_ttl = (final && metadata_snapshot->hasAnyTTL());
|
||||
|
||||
new_part = merger_mutator.mergePartsToTemporaryPart(
|
||||
future_part, metadata_snapshot, *merge_entry, table_lock_holder, time(nullptr),
|
||||
@ -965,7 +965,12 @@ void StorageMergeTree::clearOldMutations(bool truncate)
|
||||
}
|
||||
|
||||
bool StorageMergeTree::optimize(
|
||||
const ASTPtr & /*query*/, const ASTPtr & partition, bool final, bool deduplicate, const Context & context)
|
||||
const ASTPtr & /*query*/,
|
||||
const StorageMetadataPtr & /*metadata_snapshot*/,
|
||||
const ASTPtr & partition,
|
||||
bool final,
|
||||
bool deduplicate,
|
||||
const Context & context)
|
||||
{
|
||||
String disable_reason;
|
||||
if (!partition && final)
|
||||
|
@ -53,7 +53,13 @@ public:
|
||||
|
||||
/** Perform the next step in combining the parts.
|
||||
*/
|
||||
bool optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & context) override;
|
||||
bool optimize(
|
||||
const ASTPtr & query,
|
||||
const StorageMetadataPtr & /*metadata_snapshot*/,
|
||||
const ASTPtr & partition,
|
||||
bool final,
|
||||
bool deduplicate,
|
||||
const Context & context) override;
|
||||
|
||||
void alterPartition(
|
||||
const ASTPtr & query,
|
||||
|
@ -3475,7 +3475,12 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/,
|
||||
|
||||
|
||||
bool StorageReplicatedMergeTree::optimize(
|
||||
const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & query_context)
|
||||
const ASTPtr & query,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const ASTPtr & partition,
|
||||
bool final,
|
||||
bool deduplicate,
|
||||
const Context & query_context)
|
||||
{
|
||||
assertNotReadonly();
|
||||
|
||||
@ -3498,7 +3503,7 @@ bool StorageReplicatedMergeTree::optimize(
|
||||
return false;
|
||||
};
|
||||
|
||||
bool force_ttl = (final && hasAnyTTL());
|
||||
bool force_ttl = (final && metadata_snapshot->hasAnyTTL());
|
||||
const auto storage_settings_ptr = getSettings();
|
||||
|
||||
if (!partition && final)
|
||||
|
@ -101,7 +101,7 @@ public:
|
||||
|
||||
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context) override;
|
||||
|
||||
bool optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & query_context) override;
|
||||
bool optimize(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, const ASTPtr & partition, bool final, bool deduplicate, const Context & query_context) override;
|
||||
|
||||
void alter(const AlterCommands & params, const Context & query_context, TableStructureWriteLockHolder & table_lock_holder) override;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user