Merge pull request #6274 from CurtizJ/ttl-old-parts

TTL improvements.
This commit is contained in:
alexey-milovidov 2019-08-02 22:51:28 +03:00 committed by GitHub
commit f8980e691e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 152 additions and 48 deletions

View File

@ -17,10 +17,12 @@ TTLBlockInputStream::TTLBlockInputStream(
const BlockInputStreamPtr & input_,
const MergeTreeData & storage_,
const MergeTreeData::MutableDataPartPtr & data_part_,
time_t current_time_)
time_t current_time_,
bool force_)
: storage(storage_)
, data_part(data_part_)
, current_time(current_time_)
, force(force_)
, old_ttl_infos(data_part->ttl_infos)
, log(&Logger::get(storage.getLogName() + " (TTLBlockInputStream)"))
, date_lut(DateLUT::instance())
@ -60,6 +62,10 @@ TTLBlockInputStream::TTLBlockInputStream(
}
}
bool TTLBlockInputStream::isTTLExpired(time_t ttl)
{
return (ttl && (ttl <= current_time));
}
Block TTLBlockInputStream::readImpl()
{
@ -70,13 +76,13 @@ Block TTLBlockInputStream::readImpl()
if (storage.hasTableTTL())
{
/// Skip all data if table ttl is expired for part
if (old_ttl_infos.table_ttl.max <= current_time)
if (isTTLExpired(old_ttl_infos.table_ttl.max))
{
rows_removed = data_part->rows_count;
return {};
}
if (old_ttl_infos.table_ttl.min <= current_time)
if (force || isTTLExpired(old_ttl_infos.table_ttl.min))
removeRowsWithExpiredTableTTL(block);
}
@ -96,15 +102,15 @@ void TTLBlockInputStream::readSuffixImpl()
data_part->empty_columns = std::move(empty_columns);
if (rows_removed)
LOG_INFO(log, "Removed " << rows_removed << " rows with expired ttl from part " << data_part->name);
LOG_INFO(log, "Removed " << rows_removed << " rows with expired TTL from part " << data_part->name);
}
void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block)
{
storage.ttl_table_entry.expression->execute(block);
const auto & current = block.getByName(storage.ttl_table_entry.result_column);
const IColumn * ttl_column = current.column.get();
const IColumn * ttl_column =
block.getByName(storage.ttl_table_entry.result_column).column.get();
const auto & column_names = header.getNames();
MutableColumns result_columns;
@ -112,15 +118,14 @@ void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block)
for (auto it = column_names.begin(); it != column_names.end(); ++it)
{
auto & column_with_type = block.getByName(*it);
const IColumn * values_column = column_with_type.column.get();
const IColumn * values_column = block.getByName(*it).column.get();
MutableColumnPtr result_column = values_column->cloneEmpty();
result_column->reserve(block.rows());
for (size_t i = 0; i < block.rows(); ++i)
{
UInt32 cur_ttl = getTimestampByIndex(ttl_column, i);
if (cur_ttl > current_time)
if (!isTTLExpired(cur_ttl))
{
new_ttl_infos.table_ttl.update(cur_ttl);
result_column->insertFrom(*values_column, i);
@ -148,10 +153,12 @@ void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block)
const auto & old_ttl_info = old_ttl_infos.columns_ttl[name];
auto & new_ttl_info = new_ttl_infos.columns_ttl[name];
if (old_ttl_info.min > current_time)
/// Nothing to do
if (!force && !isTTLExpired(old_ttl_info.min))
continue;
if (old_ttl_info.max <= current_time)
/// Later drop full column
if (isTTLExpired(old_ttl_info.max))
continue;
if (!block.has(ttl_entry.result_column))
@ -166,14 +173,12 @@ void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block)
MutableColumnPtr result_column = values_column->cloneEmpty();
result_column->reserve(block.rows());
const auto & current = block.getByName(ttl_entry.result_column);
const IColumn * ttl_column = current.column.get();
const IColumn * ttl_column = block.getByName(ttl_entry.result_column).column.get();
for (size_t i = 0; i < block.rows(); ++i)
{
UInt32 cur_ttl = getTimestampByIndex(ttl_column, i);
if (cur_ttl <= current_time)
if (isTTLExpired(cur_ttl))
{
if (default_column)
result_column->insertFrom(*default_column, i);

View File

@ -16,10 +16,11 @@ public:
const BlockInputStreamPtr & input_,
const MergeTreeData & storage_,
const MergeTreeData::MutableDataPartPtr & data_part_,
time_t current_time
time_t current_time,
bool force_
);
String getName() const override { return "TTLBlockInputStream"; }
String getName() const override { return "TTL"; }
Block getHeader() const override { return header; }
@ -36,6 +37,7 @@ private:
const MergeTreeData::MutableDataPartPtr & data_part;
time_t current_time;
bool force;
MergeTreeDataPart::TTLInfos old_ttl_infos;
MergeTreeDataPart::TTLInfos new_ttl_infos;
@ -50,13 +52,14 @@ private:
Block header;
private:
/// Removes values with expired ttl and computes new min_ttl and empty_columns for part
/// Removes values with expired ttl and computes new_ttl_infos and empty_columns for part
void removeValuesWithExpiredColumnTTL(Block & block);
/// Remove rows with expired table ttl and computes new min_ttl for part
/// Removes rows with expired table ttl and computes new ttl_infos for part
void removeRowsWithExpiredTableTTL(Block & block);
UInt32 getTimestampByIndex(const IColumn * column, size_t ind);
bool isTTLExpired(time_t ttl);
};
}

View File

@ -14,6 +14,7 @@ namespace ActionLocks
extern const StorageActionBlockType PartsSend = 3;
extern const StorageActionBlockType ReplicationQueue = 4;
extern const StorageActionBlockType DistributedSend = 5;
extern const StorageActionBlockType PartsTTLMerge = 6;
}

View File

@ -46,6 +46,7 @@ namespace ActionLocks
extern StorageActionBlockType PartsSend;
extern StorageActionBlockType ReplicationQueue;
extern StorageActionBlockType DistributedSend;
extern StorageActionBlockType PartsTTLMerge;
}
@ -180,6 +181,12 @@ BlockIO InterpreterSystemQuery::execute()
case Type::START_MERGES:
startStopAction(context, query, ActionLocks::PartsMerge, true);
break;
case Type::STOP_TTL_MERGES:
startStopAction(context, query, ActionLocks::PartsTTLMerge, false);
break;
case Type::START_TTL_MERGES:
startStopAction(context, query, ActionLocks::PartsTTLMerge, true);
break;
case Type::STOP_FETCHES:
startStopAction(context, query, ActionLocks::PartsFetch, false);
break;

View File

@ -55,6 +55,10 @@ const char * ASTSystemQuery::typeToString(Type type)
return "STOP MERGES";
case Type::START_MERGES:
return "START MERGES";
case Type::STOP_TTL_MERGES:
return "STOP TTL MERGES";
case Type::START_TTL_MERGES:
return "START TTL MERGES";
case Type::STOP_FETCHES:
return "STOP FETCHES";
case Type::START_FETCHES:
@ -100,6 +104,8 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
if ( type == Type::STOP_MERGES
|| type == Type::START_MERGES
|| type == Type::STOP_TTL_MERGES
|| type == Type::START_TTL_MERGES
|| type == Type::STOP_FETCHES
|| type == Type::START_FETCHES
|| type == Type::STOP_REPLICATED_SENDS

View File

@ -33,6 +33,8 @@ public:
RELOAD_CONFIG,
STOP_MERGES,
START_MERGES,
STOP_TTL_MERGES,
START_TTL_MERGES,
STOP_FETCHES,
START_FETCHES,
STOP_REPLICATED_SENDS,

View File

@ -56,6 +56,8 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
case Type::STOP_MERGES:
case Type::START_MERGES:
case Type::STOP_TTL_MERGES:
case Type::START_TTL_MERGES:
case Type::STOP_FETCHES:
case Type::START_FETCHES:
case Type::STOP_REPLICATED_SENDS:

View File

@ -531,6 +531,7 @@ public:
bool hasPrimaryKey() const { return !primary_key_columns.empty(); }
bool hasSkipIndices() const { return !skip_indices.empty(); }
bool hasTableTTL() const { return ttl_table_ast != nullptr; }
bool hasAnyColumnTTL() const { return !ttl_entries_by_name.empty(); }
/// Check that the part is not broken and calculate the checksums for it if they are not present.
MutableDataPartPtr loadPartAndFixMetadata(const String & relative_path);

View File

@ -226,7 +226,7 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge(
(current_time - last_merge_with_ttl > data.settings.merge_with_ttl_timeout);
/// NOTE Could allow selection of different merge strategy.
if (can_merge_with_ttl && has_part_with_expired_ttl)
if (can_merge_with_ttl && has_part_with_expired_ttl && !ttl_merges_blocker.isCancelled())
{
merge_selector = std::make_unique<TTLMergeSelector>(current_time);
last_merge_with_ttl = current_time;
@ -522,11 +522,11 @@ public:
/// parts should be sorted.
MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart(
const FutureMergedMutatedPart & future_part, MergeList::Entry & merge_entry,
time_t time_of_merge, DiskSpaceMonitor::Reservation * disk_reservation, bool deduplicate)
time_t time_of_merge, DiskSpaceMonitor::Reservation * disk_reservation, bool deduplicate, bool force_ttl)
{
static const String TMP_PREFIX = "tmp_merge_";
if (actions_blocker.isCancelled())
if (merges_blocker.isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
const MergeTreeData::DataPartsVector & parts = future_part.parts;
@ -560,7 +560,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
size_t sum_input_rows_upper_bound = merge_entry->total_rows_count;
bool need_remove_expired_values = false;
bool need_remove_expired_values = force_ttl;
for (const MergeTreeData::DataPartPtr & part : parts)
new_data_part->ttl_infos.update(part->ttl_infos);
@ -568,6 +568,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
if (part_min_ttl && part_min_ttl <= time_of_merge)
need_remove_expired_values = true;
if (need_remove_expired_values && ttl_merges_blocker.isCancelled())
{
LOG_INFO(log, "Part " << new_data_part->name << " has values with expired TTL, but merges with TTL are cancelled.");
need_remove_expired_values = false;
}
MergeAlgorithm merge_alg = chooseMergeAlgorithm(parts, sum_input_rows_upper_bound, gathering_columns, deduplicate, need_remove_expired_values);
@ -707,7 +712,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
merged_stream = std::make_shared<DistinctSortedBlockInputStream>(merged_stream, SizeLimits(), 0 /*limit_hint*/, Names());
if (need_remove_expired_values)
merged_stream = std::make_shared<TTLBlockInputStream>(merged_stream, data, new_data_part, time_of_merge);
merged_stream = std::make_shared<TTLBlockInputStream>(merged_stream, data, new_data_part, time_of_merge, force_ttl);
MergedBlockOutputStream to{
data,
@ -724,8 +729,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
size_t rows_written = 0;
const size_t initial_reservation = disk_reservation ? disk_reservation->getSize() : 0;
auto is_cancelled = [&]() { return merges_blocker.isCancelled()
|| (need_remove_expired_values && ttl_merges_blocker.isCancelled()); };
Block block;
while (!actions_blocker.isCancelled() && (block = merged_stream->read()))
while (!is_cancelled() && (block = merged_stream->read()))
{
rows_written += block.rows();
@ -749,9 +757,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
merged_stream->readSuffix();
merged_stream.reset();
if (actions_blocker.isCancelled())
if (merges_blocker.isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
if (need_remove_expired_values && ttl_merges_blocker.isCancelled())
throw Exception("Cancelled merging parts with expired TTL", ErrorCodes::ABORTED);
MergeTreeData::DataPart::Checksums checksums_gathered_columns;
/// Gather ordinary columns
@ -815,13 +826,13 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
size_t column_elems_written = 0;
column_to.writePrefix();
while (!actions_blocker.isCancelled() && (block = column_gathered_stream.read()))
while (!merges_blocker.isCancelled() && (block = column_gathered_stream.read()))
{
column_elems_written += block.rows();
column_to.write(block);
}
if (actions_blocker.isCancelled())
if (merges_blocker.isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
column_gathered_stream.readSuffix();
@ -875,7 +886,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
{
auto check_not_cancelled = [&]()
{
if (actions_blocker.isCancelled() || merge_entry->is_cancelled)
if (merges_blocker.isCancelled() || merge_entry->is_cancelled)
throw Exception("Cancelled mutating parts", ErrorCodes::ABORTED);
return true;

View File

@ -95,7 +95,7 @@ public:
MergeTreeData::MutableDataPartPtr mergePartsToTemporaryPart(
const FutureMergedMutatedPart & future_part,
MergeListEntry & merge_entry, time_t time_of_merge,
DiskSpaceMonitor::Reservation * disk_reservation, bool deduplication);
DiskSpaceMonitor::Reservation * disk_reservation, bool deduplication, bool force_ttl);
/// Mutate a single data part with the specified commands. Will create and return a temporary part.
MergeTreeData::MutableDataPartPtr mutatePartToTemporaryPart(
@ -120,7 +120,8 @@ public:
/** Is used to cancel all merges and mutations. On cancel() call all currently running actions will throw exception soon.
* All new attempts to start a merge or mutation will throw an exception until all 'LockHolder' objects will be destroyed.
*/
ActionBlocker actions_blocker;
ActionBlocker merges_blocker;
ActionBlocker ttl_merges_blocker;
enum class MergeAlgorithm
{

View File

@ -86,7 +86,7 @@ void ReplicatedMergeTreeAlterThread::run()
auto metadata_diff = ReplicatedMergeTreeTableMetadata(storage).checkAndFindDiff(metadata_in_zk, /* allow_alter = */ true);
/// If you need to lock table structure, then suspend merges.
ActionLock merge_blocker = storage.merger_mutator.actions_blocker.cancel();
ActionLock merge_blocker = storage.merger_mutator.merges_blocker.cancel();
MergeTreeData::DataParts parts;

View File

@ -77,6 +77,10 @@ struct ReplicatedMergeTreeLogEntryData
bool deduplicate = false; /// Do deduplicate on merge
String column_name;
/// Force filter by TTL in 'OPTIMIZE ... FINAL' query to remove expired values from old parts
/// without TTL infos or with outdated TTL infos, e.g. after 'ALTER ... MODIFY TTL' query.
bool force_ttl = false;
/// For DROP_RANGE, true means that the parts need not be deleted, but moved to the `detached` directory.
bool detach = false;

View File

@ -948,7 +948,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
sum_parts_size_in_bytes += part->bytes_on_disk;
}
if (merger_mutator.actions_blocker.isCancelled())
if (merger_mutator.merges_blocker.isCancelled())
{
String reason = "Not executing log entry for part " + entry.new_part_name + " because merges and mutations are cancelled now.";
LOG_DEBUG(log, reason);

View File

@ -41,6 +41,7 @@ namespace ErrorCodes
namespace ActionLocks
{
extern const StorageActionBlockType PartsMerge;
extern const StorageActionBlockType PartsTTLMerge;
}
@ -104,7 +105,7 @@ void StorageMergeTree::shutdown()
if (shutdown_called)
return;
shutdown_called = true;
merger_mutator.actions_blocker.cancelForever();
merger_mutator.merges_blocker.cancelForever();
if (background_task_handle)
background_pool.removeTask(background_task_handle);
}
@ -164,7 +165,7 @@ void StorageMergeTree::truncate(const ASTPtr &, const Context &)
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger_mutator.actions_blocker.cancel();
auto merge_blocker = merger_mutator.merges_blocker.cancel();
/// NOTE: It's assumed that this method is called under lockForAlter.
@ -252,7 +253,7 @@ void StorageMergeTree::alter(
}
/// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time.
auto merge_blocker = merger_mutator.actions_blocker.cancel();
auto merge_blocker = merger_mutator.merges_blocker.cancel();
lockNewDataStructureExclusively(table_lock_holder, context.getCurrentQueryId());
@ -590,9 +591,13 @@ bool StorageMergeTree::merge(
try
{
/// Force filter by TTL in 'OPTIMIZE ... FINAL' query to remove expired values from old parts
/// without TTL infos or with outdated TTL infos, e.g. after 'ALTER ... MODIFY TTL' query.
bool force_ttl = (final && (hasTableTTL() || hasAnyColumnTTL()));
new_part = merger_mutator.mergePartsToTemporaryPart(
future_part, *merge_entry, time(nullptr),
merging_tagger->reserved_space.get(), deduplicate);
merging_tagger->reserved_space.get(), deduplicate, force_ttl);
merger_mutator.renameMergedTemporaryPart(new_part, future_part.parts, nullptr);
removeEmptyColumnsFromPart(new_part);
@ -730,7 +735,7 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::backgroundTask()
if (shutdown_called)
return BackgroundProcessingPoolTaskResult::ERROR;
if (merger_mutator.actions_blocker.isCancelled())
if (merger_mutator.merges_blocker.isCancelled())
return BackgroundProcessingPoolTaskResult::ERROR;
try
@ -825,7 +830,7 @@ void StorageMergeTree::clearColumnInPartition(const ASTPtr & partition, const Fi
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger_mutator.actions_blocker.cancel();
auto merge_blocker = merger_mutator.merges_blocker.cancel();
/// We don't change table structure, only data in some parts, parts are locked inside alterDataPart() function
auto lock_read_structure = lockStructureForShare(false, context.getCurrentQueryId());
@ -982,7 +987,7 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, cons
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger_mutator.actions_blocker.cancel();
auto merge_blocker = merger_mutator.merges_blocker.cancel();
/// Waits for completion of merge and does not start new ones.
auto lock = lockExclusively(context.getCurrentQueryId());
@ -1140,7 +1145,9 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
ActionLock StorageMergeTree::getActionLock(StorageActionBlockType action_type)
{
if (action_type == ActionLocks::PartsMerge)
return merger_mutator.actions_blocker.cancel();
return merger_mutator.merges_blocker.cancel();
else if (action_type == ActionLocks::PartsTTLMerge)
return merger_mutator.ttl_merges_blocker.cancel();
return {};
}

View File

@ -119,6 +119,7 @@ namespace ActionLocks
extern const StorageActionBlockType PartsFetch;
extern const StorageActionBlockType PartsSend;
extern const StorageActionBlockType ReplicationQueue;
extern const StorageActionBlockType PartsTTLMerge;
}
@ -1076,7 +1077,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
try
{
part = merger_mutator.mergePartsToTemporaryPart(
future_merged_part, *merge_entry, entry.create_time, reserved_space.get(), entry.deduplicate);
future_merged_part, *merge_entry, entry.create_time, reserved_space.get(), entry.deduplicate, entry.force_ttl);
merger_mutator.renameMergedTemporaryPart(part, parts, &transaction);
removeEmptyColumnsFromPart(part);
@ -2150,6 +2151,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
return;
const bool deduplicate = false; /// TODO: read deduplicate option from table config
const bool force_ttl = false;
bool success = false;
@ -2183,7 +2185,8 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
if (max_source_parts_size_for_merge > 0 &&
merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size_for_merge, merge_pred))
{
success = createLogEntryToMergeParts(zookeeper, future_merged_part.parts, future_merged_part.name, deduplicate);
success = createLogEntryToMergeParts(zookeeper, future_merged_part.parts,
future_merged_part.name, deduplicate, force_ttl);
}
else if (max_source_part_size_for_mutation > 0 && queue.countMutations() > 0)
{
@ -2247,6 +2250,7 @@ bool StorageReplicatedMergeTree::createLogEntryToMergeParts(
const DataPartsVector & parts,
const String & merged_name,
bool deduplicate,
bool force_ttl,
ReplicatedMergeTreeLogEntryData * out_log_entry)
{
std::vector<std::future<Coordination::ExistsResponse>> exists_futures;
@ -2282,6 +2286,7 @@ bool StorageReplicatedMergeTree::createLogEntryToMergeParts(
entry.source_replica = replica_name;
entry.new_part_name = merged_name;
entry.deduplicate = deduplicate;
entry.force_ttl = force_ttl;
entry.create_time = time(nullptr);
for (const auto & part : parts)
@ -2850,7 +2855,7 @@ void StorageReplicatedMergeTree::shutdown()
{
/// Cancel fetches, merges and mutations to force the queue_task to finish ASAP.
fetcher.blocker.cancelForever();
merger_mutator.actions_blocker.cancelForever();
merger_mutator.merges_blocker.cancelForever();
restarting_thread.shutdown();
@ -2992,6 +2997,8 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
return false;
};
bool force_ttl = (final && (hasTableTTL() || hasAnyColumnTTL()));
if (!partition && final)
{
DataPartsVector data_parts = getDataPartsVector();
@ -3008,8 +3015,8 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
bool selected = merger_mutator.selectAllPartsToMergeWithinPartition(
future_merged_part, disk_space, can_merge, partition_id, true, nullptr);
ReplicatedMergeTreeLogEntryData merge_entry;
if (selected &&
!createLogEntryToMergeParts(zookeeper, future_merged_part.parts, future_merged_part.name, deduplicate, &merge_entry))
if (selected && !createLogEntryToMergeParts(zookeeper, future_merged_part.parts,
future_merged_part.name, deduplicate, force_ttl, &merge_entry))
return handle_noop("Can't create merge queue node in ZooKeeper");
if (merge_entry.type != ReplicatedMergeTreeLogEntryData::Type::EMPTY)
merge_entries.push_back(std::move(merge_entry));
@ -3044,7 +3051,8 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
}
ReplicatedMergeTreeLogEntryData merge_entry;
if (!createLogEntryToMergeParts(zookeeper, future_merged_part.parts, future_merged_part.name, deduplicate, &merge_entry))
if (!createLogEntryToMergeParts(zookeeper, future_merged_part.parts,
future_merged_part.name, deduplicate, force_ttl, &merge_entry))
return handle_noop("Can't create merge queue node in ZooKeeper");
if (merge_entry.type != ReplicatedMergeTreeLogEntryData::Type::EMPTY)
merge_entries.push_back(std::move(merge_entry));
@ -5017,7 +5025,10 @@ ReplicatedMergeTreeAddress StorageReplicatedMergeTree::getReplicatedMergeTreeAdd
ActionLock StorageReplicatedMergeTree::getActionLock(StorageActionBlockType action_type)
{
if (action_type == ActionLocks::PartsMerge)
return merger_mutator.actions_blocker.cancel();
return merger_mutator.merges_blocker.cancel();
if (action_type == ActionLocks::PartsTTLMerge)
return merger_mutator.ttl_merges_blocker.cancel();
if (action_type == ActionLocks::PartsFetch)
return fetcher.blocker.cancel();

View File

@ -424,6 +424,7 @@ private:
const DataPartsVector & parts,
const String & merged_name,
bool deduplicate,
bool force_ttl,
ReplicatedMergeTreeLogEntryData * out_log_entry = nullptr);
bool createLogEntryToMutatePart(const MergeTreeDataPart & part, Int64 mutation_version);

View File

@ -0,0 +1,6 @@
2000-10-10 1
2000-10-10 2
2100-10-10 3
2100-10-10 4
2100-10-10 3
2100-10-10 4

View File

@ -0,0 +1,18 @@
drop table if exists ttl;
create table ttl (d Date, a Int) engine = MergeTree order by a partition by toDayOfMonth(d) ttl d + interval 1 day;
system stop ttl merges;
insert into ttl values (toDateTime('2000-10-10 00:00:00'), 1), (toDateTime('2000-10-10 00:00:00'), 2)
insert into ttl values (toDateTime('2100-10-10 00:00:00'), 3), (toDateTime('2100-10-10 00:00:00'), 4);
select sleep(1) format Null; -- wait if very fast merge happen
optimize table ttl partition 10 final;
select * from ttl order by d, a;
system start ttl merges;
optimize table ttl partition 10 final;
select * from ttl order by d, a;
drop table if exists ttl;

View File

@ -0,0 +1,2 @@
2100-10-10 3
2100-10-10 4

View File

@ -0,0 +1,16 @@
drop table if exists ttl;
create table ttl (d Date, a Int) engine = MergeTree order by a partition by toDayOfMonth(d);
insert into ttl values (toDateTime('2000-10-10 00:00:00'), 1);
insert into ttl values (toDateTime('2000-10-10 00:00:00'), 2);
insert into ttl values (toDateTime('2100-10-10 00:00:00'), 3);
insert into ttl values (toDateTime('2100-10-10 00:00:00'), 4);
alter table ttl modify ttl d + interval 1 day;
select sleep(1) format Null; -- wait if very fast merge happen
optimize table ttl partition 10 final;
select * from ttl order by d;
drop table if exists ttl;