diff --git a/dbms/src/DataStreams/TTLBlockInputStream.h b/dbms/src/DataStreams/TTLBlockInputStream.h index a57c6cf74bf..5ed6aa9e520 100644 --- a/dbms/src/DataStreams/TTLBlockInputStream.h +++ b/dbms/src/DataStreams/TTLBlockInputStream.h @@ -20,7 +20,7 @@ public: bool force_ ); - String getName() const override { return "TTLBlockInputStream"; } + String getName() const override { return "TTL"; } Block getHeader() const override { return header; } diff --git a/dbms/src/Interpreters/ActionLocksManager.cpp b/dbms/src/Interpreters/ActionLocksManager.cpp index 1f9329f85a9..56e0e37c432 100644 --- a/dbms/src/Interpreters/ActionLocksManager.cpp +++ b/dbms/src/Interpreters/ActionLocksManager.cpp @@ -14,6 +14,7 @@ namespace ActionLocks extern const StorageActionBlockType PartsSend = 3; extern const StorageActionBlockType ReplicationQueue = 4; extern const StorageActionBlockType DistributedSend = 5; + extern const StorageActionBlockType PartsTTLMerge = 6; } diff --git a/dbms/src/Interpreters/InterpreterSystemQuery.cpp b/dbms/src/Interpreters/InterpreterSystemQuery.cpp index 16d6fe5ff93..b15479704f8 100644 --- a/dbms/src/Interpreters/InterpreterSystemQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSystemQuery.cpp @@ -46,6 +46,7 @@ namespace ActionLocks extern StorageActionBlockType PartsSend; extern StorageActionBlockType ReplicationQueue; extern StorageActionBlockType DistributedSend; + extern StorageActionBlockType PartsTTLMerge; } @@ -180,6 +181,12 @@ BlockIO InterpreterSystemQuery::execute() case Type::START_MERGES: startStopAction(context, query, ActionLocks::PartsMerge, true); break; + case Type::STOP_TTL_MERGES: + startStopAction(context, query, ActionLocks::PartsTTLMerge, false); + break; + case Type::START_TTL_MERGES: + startStopAction(context, query, ActionLocks::PartsTTLMerge, true); + break; case Type::STOP_FETCHES: startStopAction(context, query, ActionLocks::PartsFetch, false); break; diff --git a/dbms/src/Parsers/ASTSystemQuery.cpp b/dbms/src/Parsers/ASTSystemQuery.cpp index 699dd9d0f54..115a922dc99 100644 --- a/dbms/src/Parsers/ASTSystemQuery.cpp +++ b/dbms/src/Parsers/ASTSystemQuery.cpp @@ -55,6 +55,10 @@ const char * ASTSystemQuery::typeToString(Type type) return "STOP MERGES"; case Type::START_MERGES: return "START MERGES"; + case Type::STOP_TTL_MERGES: + return "STOP TTL MERGES"; + case Type::START_TTL_MERGES: + return "START TTL MERGES"; case Type::STOP_FETCHES: return "STOP FETCHES"; case Type::START_FETCHES: @@ -100,6 +104,8 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &, if ( type == Type::STOP_MERGES || type == Type::START_MERGES + || type == Type::STOP_TTL_MERGES + || type == Type::START_TTL_MERGES || type == Type::STOP_FETCHES || type == Type::START_FETCHES || type == Type::STOP_REPLICATED_SENDS diff --git a/dbms/src/Parsers/ASTSystemQuery.h b/dbms/src/Parsers/ASTSystemQuery.h index eef7a38425b..3418bccd350 100644 --- a/dbms/src/Parsers/ASTSystemQuery.h +++ b/dbms/src/Parsers/ASTSystemQuery.h @@ -33,6 +33,8 @@ public: RELOAD_CONFIG, STOP_MERGES, START_MERGES, + STOP_TTL_MERGES, + START_TTL_MERGES, STOP_FETCHES, START_FETCHES, STOP_REPLICATED_SENDS, diff --git a/dbms/src/Parsers/ParserSystemQuery.cpp b/dbms/src/Parsers/ParserSystemQuery.cpp index 333613e9512..57263c65601 100644 --- a/dbms/src/Parsers/ParserSystemQuery.cpp +++ b/dbms/src/Parsers/ParserSystemQuery.cpp @@ -56,6 +56,8 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & case Type::STOP_MERGES: case Type::START_MERGES: + case Type::STOP_TTL_MERGES: + case Type::START_TTL_MERGES: case Type::STOP_FETCHES: case Type::START_FETCHES: case Type::STOP_REPLICATED_SENDS: diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 2b17475801f..3a4d4038317 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -226,7 +226,7 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge( (current_time - last_merge_with_ttl > data.settings.merge_with_ttl_timeout); /// NOTE Could allow selection of different merge strategy. - if (can_merge_with_ttl && has_part_with_expired_ttl) + if (can_merge_with_ttl && has_part_with_expired_ttl && !ttl_merges_blocker.isCancelled()) { merge_selector = std::make_unique(current_time); last_merge_with_ttl = current_time; @@ -526,7 +526,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor { static const String TMP_PREFIX = "tmp_merge_"; - if (actions_blocker.isCancelled()) + if (merges_blocker.isCancelled()) throw Exception("Cancelled merging parts", ErrorCodes::ABORTED); const MergeTreeData::DataPartsVector & parts = future_part.parts; @@ -568,6 +568,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor if (part_min_ttl && part_min_ttl <= time_of_merge) need_remove_expired_values = true; + if (need_remove_expired_values && ttl_merges_blocker.isCancelled()) + { + LOG_INFO(log, "Part " << new_data_part->name << " has values with expired TTL, but merges with TTL are cancelled."); + need_remove_expired_values = false; + } + MergeAlgorithm merge_alg = chooseMergeAlgorithm(parts, sum_input_rows_upper_bound, gathering_columns, deduplicate, need_remove_expired_values); LOG_DEBUG(log, "Selected MergeAlgorithm: " << ((merge_alg == MergeAlgorithm::Vertical) ? "Vertical" : "Horizontal")); @@ -723,8 +729,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor size_t rows_written = 0; const size_t initial_reservation = disk_reservation ? disk_reservation->getSize() : 0; + auto is_cancelled = [&]() { return merges_blocker.isCancelled() + || (need_remove_expired_values && ttl_merges_blocker.isCancelled()); }; + Block block; - while (!actions_blocker.isCancelled() && (block = merged_stream->read())) + while (!is_cancelled() && (block = merged_stream->read())) { rows_written += block.rows(); @@ -748,9 +757,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor merged_stream->readSuffix(); merged_stream.reset(); - if (actions_blocker.isCancelled()) + if (merges_blocker.isCancelled()) throw Exception("Cancelled merging parts", ErrorCodes::ABORTED); + if (need_remove_expired_values && ttl_merges_blocker.isCancelled()) + throw Exception("Cancelled merging parts with expired TTL", ErrorCodes::ABORTED); + MergeTreeData::DataPart::Checksums checksums_gathered_columns; /// Gather ordinary columns @@ -814,13 +826,13 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor size_t column_elems_written = 0; column_to.writePrefix(); - while (!actions_blocker.isCancelled() && (block = column_gathered_stream.read())) + while (!merges_blocker.isCancelled() && (block = column_gathered_stream.read())) { column_elems_written += block.rows(); column_to.write(block); } - if (actions_blocker.isCancelled()) + if (merges_blocker.isCancelled()) throw Exception("Cancelled merging parts", ErrorCodes::ABORTED); column_gathered_stream.readSuffix(); @@ -874,7 +886,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor { auto check_not_cancelled = [&]() { - if (actions_blocker.isCancelled() || merge_entry->is_cancelled) + if (merges_blocker.isCancelled() || merge_entry->is_cancelled) throw Exception("Cancelled mutating parts", ErrorCodes::ABORTED); return true; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index a380b5c2bc2..43e3cd15910 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -120,7 +120,8 @@ public: /** Is used to cancel all merges and mutations. On cancel() call all currently running actions will throw exception soon. * All new attempts to start a merge or mutation will throw an exception until all 'LockHolder' objects will be destroyed. */ - ActionBlocker actions_blocker; + ActionBlocker merges_blocker; + ActionBlocker ttl_merges_blocker; enum class MergeAlgorithm { diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp index 07d3122f29e..fd0c3ace0ab 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp @@ -86,7 +86,7 @@ void ReplicatedMergeTreeAlterThread::run() auto metadata_diff = ReplicatedMergeTreeTableMetadata(storage).checkAndFindDiff(metadata_in_zk, /* allow_alter = */ true); /// If you need to lock table structure, then suspend merges. - ActionLock merge_blocker = storage.merger_mutator.actions_blocker.cancel(); + ActionLock merge_blocker = storage.merger_mutator.merges_blocker.cancel(); MergeTreeData::DataParts parts; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 6ce01f7b500..a6be0c3b9a9 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -948,7 +948,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( sum_parts_size_in_bytes += part->bytes_on_disk; } - if (merger_mutator.actions_blocker.isCancelled()) + if (merger_mutator.merges_blocker.isCancelled()) { String reason = "Not executing log entry for part " + entry.new_part_name + " because merges and mutations are cancelled now."; LOG_DEBUG(log, reason); diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 69020023df5..9c4b6559f39 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -41,6 +41,7 @@ namespace ErrorCodes namespace ActionLocks { extern const StorageActionBlockType PartsMerge; + extern const StorageActionBlockType PartsTTLMerge; } @@ -104,7 +105,7 @@ void StorageMergeTree::shutdown() if (shutdown_called) return; shutdown_called = true; - merger_mutator.actions_blocker.cancelForever(); + merger_mutator.merges_blocker.cancelForever(); if (background_task_handle) background_pool.removeTask(background_task_handle); } @@ -164,7 +165,7 @@ void StorageMergeTree::truncate(const ASTPtr &, const Context &) { /// Asks to complete merges and does not allow them to start. /// This protects against "revival" of data for a removed partition after completion of merge. - auto merge_blocker = merger_mutator.actions_blocker.cancel(); + auto merge_blocker = merger_mutator.merges_blocker.cancel(); /// NOTE: It's assumed that this method is called under lockForAlter. @@ -252,7 +253,7 @@ void StorageMergeTree::alter( } /// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time. - auto merge_blocker = merger_mutator.actions_blocker.cancel(); + auto merge_blocker = merger_mutator.merges_blocker.cancel(); lockNewDataStructureExclusively(table_lock_holder, context.getCurrentQueryId()); @@ -734,7 +735,7 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::backgroundTask() if (shutdown_called) return BackgroundProcessingPoolTaskResult::ERROR; - if (merger_mutator.actions_blocker.isCancelled()) + if (merger_mutator.merges_blocker.isCancelled()) return BackgroundProcessingPoolTaskResult::ERROR; try @@ -829,7 +830,7 @@ void StorageMergeTree::clearColumnInPartition(const ASTPtr & partition, const Fi { /// Asks to complete merges and does not allow them to start. /// This protects against "revival" of data for a removed partition after completion of merge. - auto merge_blocker = merger_mutator.actions_blocker.cancel(); + auto merge_blocker = merger_mutator.merges_blocker.cancel(); /// We don't change table structure, only data in some parts, parts are locked inside alterDataPart() function auto lock_read_structure = lockStructureForShare(false, context.getCurrentQueryId()); @@ -986,7 +987,7 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, cons { /// Asks to complete merges and does not allow them to start. /// This protects against "revival" of data for a removed partition after completion of merge. - auto merge_blocker = merger_mutator.actions_blocker.cancel(); + auto merge_blocker = merger_mutator.merges_blocker.cancel(); /// Waits for completion of merge and does not start new ones. auto lock = lockExclusively(context.getCurrentQueryId()); @@ -1144,7 +1145,9 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con ActionLock StorageMergeTree::getActionLock(StorageActionBlockType action_type) { if (action_type == ActionLocks::PartsMerge) - return merger_mutator.actions_blocker.cancel(); + return merger_mutator.merges_blocker.cancel(); + else if (action_type == ActionLocks::PartsTTLMerge) + return merger_mutator.ttl_merges_blocker.cancel(); return {}; } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 8492dd2c007..302e6743db1 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -118,6 +118,7 @@ namespace ActionLocks extern const StorageActionBlockType PartsFetch; extern const StorageActionBlockType PartsSend; extern const StorageActionBlockType ReplicationQueue; + extern const StorageActionBlockType PartsTTLMerge; } @@ -2860,7 +2861,7 @@ void StorageReplicatedMergeTree::shutdown() { /// Cancel fetches, merges and mutations to force the queue_task to finish ASAP. fetcher.blocker.cancelForever(); - merger_mutator.actions_blocker.cancelForever(); + merger_mutator.merges_blocker.cancelForever(); restarting_thread.shutdown(); @@ -5029,7 +5030,10 @@ ReplicatedMergeTreeAddress StorageReplicatedMergeTree::getReplicatedMergeTreeAdd ActionLock StorageReplicatedMergeTree::getActionLock(StorageActionBlockType action_type) { if (action_type == ActionLocks::PartsMerge) - return merger_mutator.actions_blocker.cancel(); + return merger_mutator.merges_blocker.cancel(); + + if (action_type == ActionLocks::PartsTTLMerge) + return merger_mutator.ttl_merges_blocker.cancel(); if (action_type == ActionLocks::PartsFetch) return fetcher.blocker.cancel(); diff --git a/dbms/tests/queries/0_stateless/00976_system_stop_ttl_merges.reference b/dbms/tests/queries/0_stateless/00976_system_stop_ttl_merges.reference new file mode 100644 index 00000000000..07c0a1ee521 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00976_system_stop_ttl_merges.reference @@ -0,0 +1,6 @@ +2000-10-10 1 +2000-10-10 2 +2100-10-10 3 +2100-10-10 4 +2100-10-10 3 +2100-10-10 4 diff --git a/dbms/tests/queries/0_stateless/00976_system_stop_ttl_merges.sql b/dbms/tests/queries/0_stateless/00976_system_stop_ttl_merges.sql new file mode 100644 index 00000000000..41f2428d9e6 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00976_system_stop_ttl_merges.sql @@ -0,0 +1,18 @@ +drop table if exists ttl; + +create table ttl (d Date, a Int) engine = MergeTree order by a partition by toDayOfMonth(d) ttl d + interval 1 day; + +system stop ttl merges; + +insert into ttl values (toDateTime('2000-10-10 00:00:00'), 1), (toDateTime('2000-10-10 00:00:00'), 2) +insert into ttl values (toDateTime('2100-10-10 00:00:00'), 3), (toDateTime('2100-10-10 00:00:00'), 4); + +select sleep(1) format Null; -- wait if very fast merge happen +optimize table ttl partition 10 final; +select * from ttl order by d, a; + +system start ttl merges; +optimize table ttl partition 10 final; +select * from ttl order by d, a; + +drop table if exists ttl;