mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 09:02:00 +00:00
Backport #61610 to 24.3: Cancel merges before removing moved parts
This commit is contained in:
parent
7b9ac07f6d
commit
8a7b39a1fd
@ -29,11 +29,13 @@ public:
|
||||
requires std::is_convertible_v<G, F>
|
||||
constexpr BasicScopeGuard & operator=(BasicScopeGuard<G> && src) // NOLINT(cppcoreguidelines-rvalue-reference-param-not-moved, cppcoreguidelines-noexcept-move-operations)
|
||||
{
|
||||
if (this != &src)
|
||||
if constexpr (std::is_same_v<G, F>)
|
||||
{
|
||||
if (this == &src)
|
||||
return *this;
|
||||
}
|
||||
invoke();
|
||||
function = src.release();
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
|
@ -8,10 +8,8 @@
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Common/StringUtils/StringUtils.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
#include "Storages/MutationCommands.h"
|
||||
#include <Parsers/formatAST.h>
|
||||
#include <base/sort.h>
|
||||
|
||||
#include <ranges>
|
||||
#include <Poco/Timestamp.h>
|
||||
|
||||
@ -221,6 +219,43 @@ void ReplicatedMergeTreeQueue::createLogEntriesToFetchBrokenParts()
|
||||
broken_parts_to_enqueue_fetches_on_loading.clear();
|
||||
}
|
||||
|
||||
void ReplicatedMergeTreeQueue::addDropReplaceIntent(const MergeTreePartInfo & intent)
|
||||
{
|
||||
std::lock_guard lock{state_mutex};
|
||||
drop_replace_range_intents.push_back(intent);
|
||||
}
|
||||
|
||||
void ReplicatedMergeTreeQueue::removeDropReplaceIntent(const MergeTreePartInfo & intent)
|
||||
{
|
||||
std::lock_guard lock{state_mutex};
|
||||
auto it = std::find(drop_replace_range_intents.begin(), drop_replace_range_intents.end(), intent);
|
||||
chassert(it != drop_replace_range_intents.end());
|
||||
drop_replace_range_intents.erase(it);
|
||||
}
|
||||
|
||||
bool ReplicatedMergeTreeQueue::isIntersectingWithDropReplaceIntent(
|
||||
const LogEntry & entry, const String & part_name, String & out_reason, std::unique_lock<std::mutex> & /*state_mutex lock*/) const
|
||||
{
|
||||
const auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
|
||||
for (const auto & intent : drop_replace_range_intents)
|
||||
{
|
||||
if (!intent.isDisjoint(part_info))
|
||||
{
|
||||
constexpr auto fmt_string = "Not executing {} of type {} for part {} (actual part {})"
|
||||
"because there is a drop or replace intent with part name {}.";
|
||||
LOG_INFO(
|
||||
LogToStr(out_reason, log),
|
||||
fmt_string,
|
||||
entry.znode_name,
|
||||
entry.type,
|
||||
entry.new_part_name,
|
||||
part_name,
|
||||
intent.getPartNameForLogs());
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void ReplicatedMergeTreeQueue::insertUnlocked(
|
||||
const LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed,
|
||||
@ -1175,6 +1210,33 @@ void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(
|
||||
entry->execution_complete.wait(lock, [&entry] { return !entry->currently_executing; });
|
||||
}
|
||||
|
||||
void ReplicatedMergeTreeQueue::waitForCurrentlyExecutingOpsInRange(const MergeTreePartInfo & part_info) const
|
||||
{
|
||||
Queue to_wait;
|
||||
|
||||
std::unique_lock lock(state_mutex);
|
||||
|
||||
for (const auto& entry : queue)
|
||||
{
|
||||
if (!entry->currently_executing)
|
||||
continue;
|
||||
|
||||
const auto virtual_part_names = entry->getVirtualPartNames(format_version);
|
||||
for (const auto & virtual_part_name : virtual_part_names)
|
||||
{
|
||||
if (!part_info.isDisjoint(MergeTreePartInfo::fromPartName(virtual_part_name, format_version)))
|
||||
{
|
||||
to_wait.push_back(entry);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
LOG_DEBUG(log, "Waiting for {} entries that are currently executing.", to_wait.size());
|
||||
|
||||
for (LogEntryPtr & entry : to_wait)
|
||||
entry->execution_complete.wait(lock, [&entry] { return !entry->currently_executing; });
|
||||
}
|
||||
|
||||
bool ReplicatedMergeTreeQueue::isCoveredByFuturePartsImpl(const LogEntry & entry, const String & new_part_name,
|
||||
String & out_reason, std::unique_lock<std::mutex> & /* queue_lock */,
|
||||
@ -1303,6 +1365,9 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
|
||||
/// We can wait in worker threads, but not in scheduler.
|
||||
if (isCoveredByFuturePartsImpl(entry, new_part_name, out_postpone_reason, state_lock, /* covered_entries_to_wait */ nullptr))
|
||||
return false;
|
||||
|
||||
if (isIntersectingWithDropReplaceIntent(entry, new_part_name, out_postpone_reason, state_lock))
|
||||
return false;
|
||||
}
|
||||
|
||||
if (entry.type != LogEntry::DROP_RANGE && entry.type != LogEntry::DROP_PART)
|
||||
|
@ -107,6 +107,8 @@ private:
|
||||
*/
|
||||
ActiveDataPartSet virtual_parts;
|
||||
|
||||
/// Used to prevent operations to start in ranges which will be affected by DROP_RANGE/REPLACE_RANGE
|
||||
std::vector<MergeTreePartInfo> drop_replace_range_intents;
|
||||
|
||||
/// We do not add DROP_PARTs to virtual_parts because they can intersect,
|
||||
/// so we store them separately in this structure.
|
||||
@ -251,6 +253,10 @@ private:
|
||||
std::optional<time_t> min_unprocessed_insert_time_changed,
|
||||
std::optional<time_t> max_processed_insert_time_changed) const;
|
||||
|
||||
bool isIntersectingWithDropReplaceIntent(
|
||||
const LogEntry & entry,
|
||||
const String & part_name, String & out_reason, std::unique_lock<std::mutex> & /*state_mutex lock*/) const;
|
||||
|
||||
/// Marks the element of the queue as running.
|
||||
class CurrentlyExecuting
|
||||
{
|
||||
@ -349,6 +355,9 @@ public:
|
||||
const MergeTreePartInfo & part_info,
|
||||
const std::optional<ReplicatedMergeTreeLogEntryData> & covering_entry);
|
||||
|
||||
/// Wait for the execution of currently executing actions with virtual parts intersecting with part_info
|
||||
void waitForCurrentlyExecutingOpsInRange(const MergeTreePartInfo & part_info) const;
|
||||
|
||||
/** In the case where there are not enough parts to perform the merge in part_name
|
||||
* - move actions with merged parts to the end of the queue
|
||||
* (in order to download a already merged part from another replica).
|
||||
@ -490,6 +499,12 @@ public:
|
||||
void setBrokenPartsToEnqueueFetchesOnLoading(Strings && parts_to_fetch);
|
||||
/// Must be called right after queue loading.
|
||||
void createLogEntriesToFetchBrokenParts();
|
||||
|
||||
/// Add an intent to block operations to start in the range. All intents must be removed by calling
|
||||
/// removeDropReplaceIntent(). The same intent can be added multiple times, but it has to be removed exactly
|
||||
/// the same amount of times.
|
||||
void addDropReplaceIntent(const MergeTreePartInfo& intent);
|
||||
void removeDropReplaceIntent(const MergeTreePartInfo& intent);
|
||||
};
|
||||
|
||||
using CommittingBlocks = std::unordered_map<String, std::set<Int64>>;
|
||||
|
@ -8006,6 +8006,20 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
|
||||
|
||||
assert(replace == !LogEntry::ReplaceRangeEntry::isMovePartitionOrAttachFrom(drop_range));
|
||||
|
||||
scope_guard intent_guard;
|
||||
if (replace)
|
||||
{
|
||||
queue.addDropReplaceIntent(drop_range);
|
||||
intent_guard = scope_guard{[this, my_drop_range = drop_range]() { queue.removeDropReplaceIntent(my_drop_range); }};
|
||||
|
||||
getContext()->getMergeList().cancelInPartition(getStorageID(), drop_range.partition_id, drop_range.max_block);
|
||||
queue.waitForCurrentlyExecutingOpsInRange(drop_range);
|
||||
{
|
||||
auto pause_checking_parts = part_check_thread.pausePartsCheck();
|
||||
part_check_thread.cancelRemovedPartsCheck(drop_range);
|
||||
}
|
||||
}
|
||||
|
||||
String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range);
|
||||
|
||||
std::set<String> replaced_parts;
|
||||
@ -8174,8 +8188,11 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
|
||||
lock2.reset();
|
||||
lock1.reset();
|
||||
|
||||
/// We need to pull the DROP_RANGE before cleaning the replaced parts (otherwise CHeckThread may decide that parts are lost)
|
||||
/// We need to pull the REPLACE_RANGE before cleaning the replaced parts (otherwise CHeckThread may decide that parts are lost)
|
||||
queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), {}, ReplicatedMergeTreeQueue::SYNC);
|
||||
// No need to block operations further, especially that in case we have to wait for mutation to finish, the intent would block
|
||||
// the execution of REPLACE_RANGE
|
||||
intent_guard.reset();
|
||||
parts_holder.clear();
|
||||
cleanup_thread.wakeup();
|
||||
|
||||
@ -8227,11 +8244,23 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
|
||||
Coordination::Stat alter_partition_version_stat;
|
||||
zookeeper->get(alter_partition_version_path, &alter_partition_version_stat);
|
||||
|
||||
MergeTreePartInfo drop_range;
|
||||
std::optional<EphemeralLockInZooKeeper> delimiting_block_lock;
|
||||
MergeTreePartInfo drop_range;
|
||||
getFakePartCoveringAllPartsInPartition(partition_id, drop_range, delimiting_block_lock, true);
|
||||
String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range);
|
||||
|
||||
queue.addDropReplaceIntent(drop_range);
|
||||
// Let's copy drop_range to make sure it doesn't get modified, otherwise we might run into issue on removal
|
||||
scope_guard intent_guard{[this, my_drop_range = drop_range]() { queue.removeDropReplaceIntent(my_drop_range); }};
|
||||
|
||||
getContext()->getMergeList().cancelInPartition(getStorageID(), drop_range.partition_id, drop_range.max_block);
|
||||
|
||||
queue.waitForCurrentlyExecutingOpsInRange(drop_range);
|
||||
{
|
||||
auto pause_checking_parts = part_check_thread.pausePartsCheck();
|
||||
part_check_thread.cancelRemovedPartsCheck(drop_range);
|
||||
}
|
||||
|
||||
DataPartPtr covering_part;
|
||||
DataPartsVector src_all_parts;
|
||||
{
|
||||
@ -8436,6 +8465,9 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
|
||||
|
||||
/// We need to pull the DROP_RANGE before cleaning the replaced parts (otherwise CHeckThread may decide that parts are lost)
|
||||
queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), {}, ReplicatedMergeTreeQueue::SYNC);
|
||||
// No need to block operations further, especially that in case we have to wait for mutation to finish, the intent would block
|
||||
// the execution of DROP_RANGE
|
||||
intent_guard.reset();
|
||||
parts_holder.clear();
|
||||
cleanup_thread.wakeup();
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user