add query 'SYSTEM STOP TTL MERGES'

This commit is contained in:
CurtizJ 2019-08-01 18:36:12 +03:00
parent c30bdecf6d
commit 82f304f81e
14 changed files with 82 additions and 20 deletions

View File

@ -20,7 +20,7 @@ public:
bool force_
);
String getName() const override { return "TTLBlockInputStream"; }
String getName() const override { return "TTL"; }
Block getHeader() const override { return header; }

View File

@ -14,6 +14,7 @@ namespace ActionLocks
extern const StorageActionBlockType PartsSend = 3;
extern const StorageActionBlockType ReplicationQueue = 4;
extern const StorageActionBlockType DistributedSend = 5;
extern const StorageActionBlockType PartsTTLMerge = 6;
}

View File

@ -46,6 +46,7 @@ namespace ActionLocks
extern StorageActionBlockType PartsSend;
extern StorageActionBlockType ReplicationQueue;
extern StorageActionBlockType DistributedSend;
extern StorageActionBlockType PartsTTLMerge;
}
@ -180,6 +181,12 @@ BlockIO InterpreterSystemQuery::execute()
case Type::START_MERGES:
startStopAction(context, query, ActionLocks::PartsMerge, true);
break;
case Type::STOP_TTL_MERGES:
startStopAction(context, query, ActionLocks::PartsTTLMerge, false);
break;
case Type::START_TTL_MERGES:
startStopAction(context, query, ActionLocks::PartsTTLMerge, true);
break;
case Type::STOP_FETCHES:
startStopAction(context, query, ActionLocks::PartsFetch, false);
break;

View File

@ -55,6 +55,10 @@ const char * ASTSystemQuery::typeToString(Type type)
return "STOP MERGES";
case Type::START_MERGES:
return "START MERGES";
case Type::STOP_TTL_MERGES:
return "STOP TTL MERGES";
case Type::START_TTL_MERGES:
return "START TTL MERGES";
case Type::STOP_FETCHES:
return "STOP FETCHES";
case Type::START_FETCHES:
@ -100,6 +104,8 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
if ( type == Type::STOP_MERGES
|| type == Type::START_MERGES
|| type == Type::STOP_TTL_MERGES
|| type == Type::START_TTL_MERGES
|| type == Type::STOP_FETCHES
|| type == Type::START_FETCHES
|| type == Type::STOP_REPLICATED_SENDS

View File

@ -33,6 +33,8 @@ public:
RELOAD_CONFIG,
STOP_MERGES,
START_MERGES,
STOP_TTL_MERGES,
START_TTL_MERGES,
STOP_FETCHES,
START_FETCHES,
STOP_REPLICATED_SENDS,

View File

@ -56,6 +56,8 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
case Type::STOP_MERGES:
case Type::START_MERGES:
case Type::STOP_TTL_MERGES:
case Type::START_TTL_MERGES:
case Type::STOP_FETCHES:
case Type::START_FETCHES:
case Type::STOP_REPLICATED_SENDS:

View File

@ -226,7 +226,7 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge(
(current_time - last_merge_with_ttl > data.settings.merge_with_ttl_timeout);
/// NOTE Could allow selection of different merge strategy.
if (can_merge_with_ttl && has_part_with_expired_ttl)
if (can_merge_with_ttl && has_part_with_expired_ttl && !ttl_merges_blocker.isCancelled())
{
merge_selector = std::make_unique<TTLMergeSelector>(current_time);
last_merge_with_ttl = current_time;
@ -526,7 +526,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
{
static const String TMP_PREFIX = "tmp_merge_";
if (actions_blocker.isCancelled())
if (merges_blocker.isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
const MergeTreeData::DataPartsVector & parts = future_part.parts;
@ -568,6 +568,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
if (part_min_ttl && part_min_ttl <= time_of_merge)
need_remove_expired_values = true;
if (need_remove_expired_values && ttl_merges_blocker.isCancelled())
{
LOG_INFO(log, "Part " << new_data_part->name << " has values with expired TTL, but merges with TTL are cancelled.");
need_remove_expired_values = false;
}
MergeAlgorithm merge_alg = chooseMergeAlgorithm(parts, sum_input_rows_upper_bound, gathering_columns, deduplicate, need_remove_expired_values);
LOG_DEBUG(log, "Selected MergeAlgorithm: " << ((merge_alg == MergeAlgorithm::Vertical) ? "Vertical" : "Horizontal"));
@ -723,8 +729,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
size_t rows_written = 0;
const size_t initial_reservation = disk_reservation ? disk_reservation->getSize() : 0;
auto is_cancelled = [&]() { return merges_blocker.isCancelled()
|| (need_remove_expired_values && ttl_merges_blocker.isCancelled()); };
Block block;
while (!actions_blocker.isCancelled() && (block = merged_stream->read()))
while (!is_cancelled() && (block = merged_stream->read()))
{
rows_written += block.rows();
@ -748,9 +757,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
merged_stream->readSuffix();
merged_stream.reset();
if (actions_blocker.isCancelled())
if (merges_blocker.isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
if (need_remove_expired_values && ttl_merges_blocker.isCancelled())
throw Exception("Cancelled merging parts with expired TTL", ErrorCodes::ABORTED);
MergeTreeData::DataPart::Checksums checksums_gathered_columns;
/// Gather ordinary columns
@ -814,13 +826,13 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
size_t column_elems_written = 0;
column_to.writePrefix();
while (!actions_blocker.isCancelled() && (block = column_gathered_stream.read()))
while (!merges_blocker.isCancelled() && (block = column_gathered_stream.read()))
{
column_elems_written += block.rows();
column_to.write(block);
}
if (actions_blocker.isCancelled())
if (merges_blocker.isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
column_gathered_stream.readSuffix();
@ -874,7 +886,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
{
auto check_not_cancelled = [&]()
{
if (actions_blocker.isCancelled() || merge_entry->is_cancelled)
if (merges_blocker.isCancelled() || merge_entry->is_cancelled)
throw Exception("Cancelled mutating parts", ErrorCodes::ABORTED);
return true;

View File

@ -120,7 +120,8 @@ public:
/** Is used to cancel all merges and mutations. On cancel() call all currently running actions will throw exception soon.
* All new attempts to start a merge or mutation will throw an exception until all 'LockHolder' objects will be destroyed.
*/
ActionBlocker actions_blocker;
ActionBlocker merges_blocker;
ActionBlocker ttl_merges_blocker;
enum class MergeAlgorithm
{

View File

@ -86,7 +86,7 @@ void ReplicatedMergeTreeAlterThread::run()
auto metadata_diff = ReplicatedMergeTreeTableMetadata(storage).checkAndFindDiff(metadata_in_zk, /* allow_alter = */ true);
/// If you need to lock table structure, then suspend merges.
ActionLock merge_blocker = storage.merger_mutator.actions_blocker.cancel();
ActionLock merge_blocker = storage.merger_mutator.merges_blocker.cancel();
MergeTreeData::DataParts parts;

View File

@ -948,7 +948,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
sum_parts_size_in_bytes += part->bytes_on_disk;
}
if (merger_mutator.actions_blocker.isCancelled())
if (merger_mutator.merges_blocker.isCancelled())
{
String reason = "Not executing log entry for part " + entry.new_part_name + " because merges and mutations are cancelled now.";
LOG_DEBUG(log, reason);

View File

@ -41,6 +41,7 @@ namespace ErrorCodes
namespace ActionLocks
{
extern const StorageActionBlockType PartsMerge;
extern const StorageActionBlockType PartsTTLMerge;
}
@ -104,7 +105,7 @@ void StorageMergeTree::shutdown()
if (shutdown_called)
return;
shutdown_called = true;
merger_mutator.actions_blocker.cancelForever();
merger_mutator.merges_blocker.cancelForever();
if (background_task_handle)
background_pool.removeTask(background_task_handle);
}
@ -164,7 +165,7 @@ void StorageMergeTree::truncate(const ASTPtr &, const Context &)
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger_mutator.actions_blocker.cancel();
auto merge_blocker = merger_mutator.merges_blocker.cancel();
/// NOTE: It's assumed that this method is called under lockForAlter.
@ -252,7 +253,7 @@ void StorageMergeTree::alter(
}
/// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time.
auto merge_blocker = merger_mutator.actions_blocker.cancel();
auto merge_blocker = merger_mutator.merges_blocker.cancel();
lockNewDataStructureExclusively(table_lock_holder, context.getCurrentQueryId());
@ -734,7 +735,7 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::backgroundTask()
if (shutdown_called)
return BackgroundProcessingPoolTaskResult::ERROR;
if (merger_mutator.actions_blocker.isCancelled())
if (merger_mutator.merges_blocker.isCancelled())
return BackgroundProcessingPoolTaskResult::ERROR;
try
@ -829,7 +830,7 @@ void StorageMergeTree::clearColumnInPartition(const ASTPtr & partition, const Fi
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger_mutator.actions_blocker.cancel();
auto merge_blocker = merger_mutator.merges_blocker.cancel();
/// We don't change table structure, only data in some parts, parts are locked inside alterDataPart() function
auto lock_read_structure = lockStructureForShare(false, context.getCurrentQueryId());
@ -986,7 +987,7 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, cons
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger_mutator.actions_blocker.cancel();
auto merge_blocker = merger_mutator.merges_blocker.cancel();
/// Waits for completion of merge and does not start new ones.
auto lock = lockExclusively(context.getCurrentQueryId());
@ -1144,7 +1145,9 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
ActionLock StorageMergeTree::getActionLock(StorageActionBlockType action_type)
{
if (action_type == ActionLocks::PartsMerge)
return merger_mutator.actions_blocker.cancel();
return merger_mutator.merges_blocker.cancel();
else if (action_type == ActionLocks::PartsTTLMerge)
return merger_mutator.ttl_merges_blocker.cancel();
return {};
}

View File

@ -118,6 +118,7 @@ namespace ActionLocks
extern const StorageActionBlockType PartsFetch;
extern const StorageActionBlockType PartsSend;
extern const StorageActionBlockType ReplicationQueue;
extern const StorageActionBlockType PartsTTLMerge;
}
@ -2860,7 +2861,7 @@ void StorageReplicatedMergeTree::shutdown()
{
/// Cancel fetches, merges and mutations to force the queue_task to finish ASAP.
fetcher.blocker.cancelForever();
merger_mutator.actions_blocker.cancelForever();
merger_mutator.merges_blocker.cancelForever();
restarting_thread.shutdown();
@ -5029,7 +5030,10 @@ ReplicatedMergeTreeAddress StorageReplicatedMergeTree::getReplicatedMergeTreeAdd
ActionLock StorageReplicatedMergeTree::getActionLock(StorageActionBlockType action_type)
{
if (action_type == ActionLocks::PartsMerge)
return merger_mutator.actions_blocker.cancel();
return merger_mutator.merges_blocker.cancel();
if (action_type == ActionLocks::PartsTTLMerge)
return merger_mutator.ttl_merges_blocker.cancel();
if (action_type == ActionLocks::PartsFetch)
return fetcher.blocker.cancel();

View File

@ -0,0 +1,6 @@
2000-10-10 1
2000-10-10 2
2100-10-10 3
2100-10-10 4
2100-10-10 3
2100-10-10 4

View File

@ -0,0 +1,18 @@
drop table if exists ttl;
create table ttl (d Date, a Int) engine = MergeTree order by a partition by toDayOfMonth(d) ttl d + interval 1 day;
system stop ttl merges;
insert into ttl values (toDateTime('2000-10-10 00:00:00'), 1), (toDateTime('2000-10-10 00:00:00'), 2)
insert into ttl values (toDateTime('2100-10-10 00:00:00'), 3), (toDateTime('2100-10-10 00:00:00'), 4);
select sleep(1) format Null; -- wait if very fast merge happen
optimize table ttl partition 10 final;
select * from ttl order by d, a;
system start ttl merges;
optimize table ttl partition 10 final;
select * from ttl order by d, a;
drop table if exists ttl;