This commit is contained in:
Vasily Nemkov 2024-09-19 11:45:51 +05:30 committed by GitHub
commit c0ae0240dc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 745 additions and 34 deletions

View File

@ -17,7 +17,7 @@ public:
bool isCancelled() const { return *counter > 0; }
/// Temporarily blocks corresponding actions (while the returned object is alive)
/// Temporary blocks corresponding actions (while the returned object is alive)
friend class ActionLock;
ActionLock cancel() { return ActionLock(*this); }

View File

@ -0,0 +1,128 @@
#include <gtest/gtest.h>
#include <chrono>
#include <mutex>
#include <stdexcept>
#include <string_view>
#include <vector>
#include <thread>
#include <Common/ActionBlocker.h>
#include <Common/ActionLock.h>
using namespace DB;
TEST(ActionBlocker, TestDefaultConstructor)
{
ActionBlocker blocker;
EXPECT_FALSE(blocker.isCancelled());
EXPECT_EQ(0, blocker.getCounter().load());
}
TEST(ActionBlocker, TestCancelForever)
{
ActionBlocker blocker;
blocker.cancelForever();
EXPECT_TRUE(blocker.isCancelled());
EXPECT_EQ(1, blocker.getCounter().load());
}
TEST(ActionBlocker, TestCancel)
{
ActionBlocker blocker;
{
auto lock = blocker.cancel();
EXPECT_TRUE(blocker.isCancelled());
EXPECT_EQ(1, blocker.getCounter().load());
}
// automatically un-cancelled on `lock` destruction
EXPECT_FALSE(blocker.isCancelled());
}
TEST(ActionLock, TestDefaultConstructor)
{
ActionLock locker;
EXPECT_TRUE(locker.expired());
}
TEST(ActionLock, TestConstructorWithActionBlocker)
{
ActionBlocker blocker;
ActionLock lock(blocker);
EXPECT_FALSE(lock.expired());
EXPECT_TRUE(blocker.isCancelled());
EXPECT_EQ(1, blocker.getCounter().load());
}
TEST(ActionLock, TestMoveAssignmentToEmpty)
{
ActionBlocker blocker;
{
ActionLock lock;
lock = blocker.cancel();
EXPECT_TRUE(blocker.isCancelled());
}
// automatically un-cancelled on `lock` destruction
EXPECT_FALSE(blocker.isCancelled());
EXPECT_EQ(0, blocker.getCounter().load());
}
TEST(ActionLock, TestMoveAssignmentToNonEmpty)
{
ActionBlocker blocker;
{
auto lock = blocker.cancel();
EXPECT_TRUE(blocker.isCancelled());
// cause a move
lock = blocker.cancel();
// blocker should remain locked
EXPECT_TRUE(blocker.isCancelled());
}
// automatically un-cancelled on `lock` destruction
EXPECT_FALSE(blocker.isCancelled());
EXPECT_EQ(0, blocker.getCounter().load());
}
TEST(ActionLock, TestMoveAssignmentToNonEmpty2)
{
ActionBlocker blocker1;
ActionBlocker blocker2;
{
auto lock = blocker1.cancel();
EXPECT_TRUE(blocker1.isCancelled());
// cause a move
lock = blocker2.cancel();
// blocker2 be remain locked, blocker1 - unlocked
EXPECT_TRUE(blocker2.isCancelled());
EXPECT_FALSE(blocker1.isCancelled());
}
// automatically un-cancelled on `lock` destruction
EXPECT_FALSE(blocker1.isCancelled());
EXPECT_FALSE(blocker2.isCancelled());
EXPECT_EQ(0, blocker1.getCounter().load());
EXPECT_EQ(0, blocker2.getCounter().load());
}
TEST(ActionLock, TestExpiration)
{
ActionLock lock;
{
ActionBlocker blocker;
lock = blocker.cancel();
EXPECT_FALSE(lock.expired());
}
EXPECT_TRUE(lock.expired());
}

View File

@ -8,6 +8,7 @@
#include <IO/copyData.h>
#include <IO/ConnectionTimeouts.h>
#include <Common/Throttler.h>
#include <Common/ActionBlocker.h>
namespace zkutil

View File

@ -179,6 +179,20 @@ static void addMissedColumnsToSerializationInfos(
}
}
bool MergeTask::GlobalRuntimeContext::isCancelled() const
{
return (future_part ? merges_blocker->isCancelledForPartition(future_part->part_info.partition_id) : merges_blocker->isCancelled())
|| merge_list_element_ptr->is_cancelled.load(std::memory_order_relaxed);
}
void MergeTask::GlobalRuntimeContext::checkOperationIsNotCanceled() const
{
if (isCancelled())
{
throw Exception(ErrorCodes::ABORTED, "Cancelled merging parts");
}
}
/// PK columns are sorted and merged, ordinary columns are gathered using info from merge step
void MergeTask::ExecuteAndFinalizeHorizontalPart::extractMergingAndGatheringColumns() const
{
@ -272,8 +286,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
const String local_tmp_suffix = global_ctx->parent_part ? global_ctx->suffix : "";
if (global_ctx->merges_blocker->isCancelled() || global_ctx->merge_list_element_ptr->is_cancelled.load(std::memory_order_relaxed))
throw Exception(ErrorCodes::ABORTED, "Cancelled merging parts");
global_ctx->checkOperationIsNotCanceled();
/// We don't want to perform merge assigned with TTL as normal merge, so
/// throw exception
@ -518,9 +531,10 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
ctx->is_cancelled = [merges_blocker = global_ctx->merges_blocker,
ttl_merges_blocker = global_ctx->ttl_merges_blocker,
need_remove = ctx->need_remove_expired_values,
merge_list_element = global_ctx->merge_list_element_ptr]() -> bool
merge_list_element = global_ctx->merge_list_element_ptr,
partition_id = global_ctx->future_part->part_info.partition_id]() -> bool
{
return merges_blocker->isCancelled()
return merges_blocker->isCancelledForPartition(partition_id)
|| (need_remove && ttl_merges_blocker->isCancelled())
|| merge_list_element->is_cancelled.load(std::memory_order_relaxed);
};
@ -797,8 +811,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::finalize() const
global_ctx->merging_executor.reset();
global_ctx->merged_pipeline.reset();
if (global_ctx->merges_blocker->isCancelled() || global_ctx->merge_list_element_ptr->is_cancelled.load(std::memory_order_relaxed))
throw Exception(ErrorCodes::ABORTED, "Cancelled merging parts");
global_ctx->checkOperationIsNotCanceled();
if (ctx->need_remove_expired_values && global_ctx->ttl_merges_blocker->isCancelled())
throw Exception(ErrorCodes::ABORTED, "Cancelled merging parts with expired TTL");
@ -1061,8 +1074,7 @@ bool MergeTask::VerticalMergeStage::executeVerticalMergeForOneColumn() const
{
Block block;
if (global_ctx->merges_blocker->isCancelled()
|| global_ctx->merge_list_element_ptr->is_cancelled.load(std::memory_order_relaxed)
if (global_ctx->isCancelled()
|| !ctx->executor->pull(block))
return false;
@ -1078,8 +1090,7 @@ bool MergeTask::VerticalMergeStage::executeVerticalMergeForOneColumn() const
void MergeTask::VerticalMergeStage::finalizeVerticalMergeForOneColumn() const
{
const String & column_name = ctx->it_name_and_type->name;
if (global_ctx->merges_blocker->isCancelled() || global_ctx->merge_list_element_ptr->is_cancelled.load(std::memory_order_relaxed))
throw Exception(ErrorCodes::ABORTED, "Cancelled merging parts");
global_ctx->checkOperationIsNotCanceled();
ctx->executor.reset();
auto changed_checksums = ctx->column_to->fillChecksums(global_ctx->new_data_part, global_ctx->checksums_gathered_columns);

View File

@ -27,6 +27,7 @@
#include <Storages/MergeTree/MergeProgress.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/PartitionActionBlocker.h>
namespace ProfileEvents
{
@ -86,7 +87,7 @@ public:
MergeTreeTransactionPtr txn,
MergeTreeData * data_,
MergeTreeDataMergerMutator * mutator_,
ActionBlocker * merges_blocker_,
PartitionActionBlocker * merges_blocker_,
ActionBlocker * ttl_merges_blocker_)
{
global_ctx = std::make_shared<GlobalRuntimeContext>();
@ -161,7 +162,7 @@ private:
MergeListElement * merge_list_element_ptr{nullptr};
MergeTreeData * data{nullptr};
MergeTreeDataMergerMutator * mutator{nullptr};
ActionBlocker * merges_blocker{nullptr};
PartitionActionBlocker * merges_blocker{nullptr};
ActionBlocker * ttl_merges_blocker{nullptr};
StorageSnapshotPtr storage_snapshot{nullptr};
StorageMetadataPtr metadata_snapshot{nullptr};
@ -215,7 +216,12 @@ private:
MergeTreeData::MergingParams merging_params{};
scope_guard temporary_directory_lock;
UInt64 prev_elapsed_ms{0};
// will throw an exception if merge was cancelled in any way.
void checkOperationIsNotCanceled() const;
bool isCancelled() const;
};
using GlobalRuntimeContextPtr = std::shared_ptr<GlobalRuntimeContext>;

View File

@ -207,7 +207,7 @@ public :
/** Is used to cancel all merges and mutations. On cancel() call all currently running actions will throw exception soon.
* All new attempts to start a merge or mutation will throw an exception until all 'LockHolder' objects will be destroyed.
*/
ActionBlocker merges_blocker;
PartitionActionBlocker merges_blocker;
ActionBlocker ttl_merges_blocker;
private:

View File

@ -9,6 +9,7 @@
#include <Storages/Statistics/Statistics.h>
#include <Columns/ColumnsNumber.h>
#include <Parsers/queryToString.h>
#include <Interpreters/Context.h>
#include <Interpreters/Squashing.h>
#include <Interpreters/MergeTreeTransaction.h>
#include <Interpreters/PreparedSets.h>
@ -65,14 +66,6 @@ namespace ErrorCodes
namespace MutationHelpers
{
static bool checkOperationIsNotCanceled(ActionBlocker & merges_blocker, MergeListEntry * mutate_entry)
{
if (merges_blocker.isCancelled() || (*mutate_entry)->is_cancelled)
throw Exception(ErrorCodes::ABORTED, "Cancelled mutating parts");
return true;
}
static bool haveMutationsOfDynamicColumns(const MergeTreeData::DataPartPtr & data_part, const MutationCommands & commands)
{
for (const auto & command : commands)
@ -985,7 +978,7 @@ struct MutationContext
{
MergeTreeData * data;
MergeTreeDataMergerMutator * mutator;
ActionBlocker * merges_blocker;
PartitionActionBlocker * merges_blocker;
TableLockHolder * holder;
MergeListEntry * mutate_entry;
@ -1049,6 +1042,17 @@ struct MutationContext
scope_guard temporary_directory_lock;
bool checkOperationIsNotCanceled() const
{
if (new_data_part ? merges_blocker->isCancelledForPartition(new_data_part->info.partition_id) : merges_blocker->isCancelled()
|| (*mutate_entry)->is_cancelled)
{
throw Exception(ErrorCodes::ABORTED, "Cancelled mutating parts");
}
return true;
}
/// Whether we need to count lightweight delete rows in this mutation
bool count_lightweight_deleted_rows;
UInt64 execute_elapsed_ns = 0;
@ -1056,7 +1060,6 @@ struct MutationContext
using MutationContextPtr = std::shared_ptr<MutationContext>;
// This class is responsible for:
// 1. get projection pipeline and a sink to write parts
// 2. build an executor that can write block to the input stream (actually we can write through it to generate as many parts as possible)
@ -1186,9 +1189,7 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
Block cur_block;
Block projection_header;
MutationHelpers::checkOperationIsNotCanceled(*ctx->merges_blocker, ctx->mutate_entry);
if (!ctx->mutating_executor->pull(cur_block))
if (!ctx->checkOperationIsNotCanceled() || !ctx->mutating_executor->pull(cur_block))
{
finalizeTempProjections();
return false;
@ -1897,7 +1898,7 @@ MutateTask::MutateTask(
const MergeTreeTransactionPtr & txn,
MergeTreeData & data_,
MergeTreeDataMergerMutator & mutator_,
ActionBlocker & merges_blocker_,
PartitionActionBlocker & merges_blocker_,
bool need_prefix_)
: ctx(std::make_shared<MutationContext>())
{
@ -1939,7 +1940,7 @@ bool MutateTask::execute()
}
case State::NEED_EXECUTE:
{
MutationHelpers::checkOperationIsNotCanceled(*ctx->merges_blocker, ctx->mutate_entry);
ctx->checkOperationIsNotCanceled();
if (task->executeStep())
return true;
@ -2032,7 +2033,7 @@ static bool canSkipMutationCommandForPart(const MergeTreeDataPartPtr & part, con
bool MutateTask::prepare()
{
ProfileEvents::increment(ProfileEvents::MutationTotalParts);
MutationHelpers::checkOperationIsNotCanceled(*ctx->merges_blocker, ctx->mutate_entry);
ctx->checkOperationIsNotCanceled();
if (ctx->future_part->parts.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to mutate {} parts, not one. "

View File

@ -4,6 +4,7 @@
#include <Storages/MergeTree/MergeProgress.h>
#include <Storages/MergeTree/FutureMergedMutatedPart.h>
#include <Storages/MergeTree/IMergedBlockOutputStream.h>
#include <Storages/MergeTree/PartitionActionBlocker.h>
#include <Storages/MutationCommands.h>
#include <Interpreters/MutationsInterpreter.h>
@ -35,7 +36,7 @@ public:
const MergeTreeTransactionPtr & txn,
MergeTreeData & data_,
MergeTreeDataMergerMutator & mutator_,
ActionBlocker & merges_blocker_,
PartitionActionBlocker & merges_blocker_,
bool need_prefix_);
bool execute();

View File

@ -0,0 +1,86 @@
#include <Storages/MergeTree/PartitionActionBlocker.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
namespace DB
{
ActionLock PartitionActionBlocker::cancelForPartition(const std::string & partition_id)
{
std::unique_lock lock(mutex);
const size_t prev_size = partition_blockers.size();
ActionLock result = partition_blockers[partition_id].cancel();
// Cleanup stale `ActionBlocker` instances once in a while, to prevent unbound growth.
cleanup_counter++;
if (prev_size != partition_blockers.size() && cleanup_counter > 32) // 32 is arbitrary.
compactPartitionBlockersLocked();
return result;
}
bool PartitionActionBlocker::isCancelledForPartition(const std::string & partition_id) const
{
if (isCancelled())
return true;
std::shared_lock lock(mutex);
return isCancelledForPartitionOnlyLocked(partition_id);
}
bool PartitionActionBlocker::isCancelledForPartitionOnlyLocked(const std::string & partition_id) const
{
auto p = partition_blockers.find(partition_id);
return p != partition_blockers.end() && p->second.isCancelled();
}
size_t PartitionActionBlocker::countPartitionBlockers() const
{
std::shared_lock lock(mutex);
return partition_blockers.size();
}
void PartitionActionBlocker::compactPartitionBlockers()
{
std::unique_lock lock(mutex);
compactPartitionBlockersLocked();
}
void PartitionActionBlocker::compactPartitionBlockersLocked()
{
std::erase_if(partition_blockers, [](const auto & p)
{
return !p.second.isCancelled();
});
cleanup_counter = 0;
}
std::string PartitionActionBlocker::formatDebug() const
{
std::shared_lock lock(mutex);
WriteBufferFromOwnString out;
out << "Global lock: " << global_blocker.getCounter().load()
<< "\n"
<< partition_blockers.size() << " live partition locks: {";
size_t i = 0;
for (const auto & p : partition_blockers)
{
out << "\n\t" << DB::double_quote << p.first << ": " << p.second.getCounter().load();
if (++i < partition_blockers.size())
out << ",";
else
out << "\n";
}
out << "}";
return out.str();
}
}

View File

@ -0,0 +1,69 @@
#pragma once
#include <memory>
#include <shared_mutex>
#include <string>
#include <unordered_map>
#include <Common/ActionLock.h>
#include <Common/ActionBlocker.h>
namespace DB
{
/// Locks actions on given MergeTree-family table.
/// Like, ActionBlocker, but has two modes:
/// - 'partition-specific' lock, @see `cancelForPartition()`, and `isCancelledForPartition()`.
/// Only actions on *specific partition* are prohibited to start
/// and must interrupt at firsts chance.
/// There could be arbitrary number of partitions locked simultaneously.
///
/// - 'global' lock, @see `cancel()`, `isCancelled()`, and `cancelForever()`
/// Any new actions for *ALL* partitions are prohibited to start
/// and are required to interrupt at first possible moment.
/// As if all individual partitions were locked with 'partition-specific lock'.
///
/// Any lock can be set multiple times and considered fully un-locked
/// only when it was un-locked same number of times (by destructing/resetting of `ActionLock`).
///
/// There could be any number of locks active at given point on single ActionBlocker instance:
/// - 'global lock' locked `N` times.
/// - `M` 'partition lock's each locked different number of times.
class PartitionActionBlocker
{
public:
PartitionActionBlocker() = default;
bool isCancelled() const { return global_blocker.isCancelled(); }
bool isCancelledForPartition(const std::string & /*partition_id*/) const;
/// Temporarily blocks corresponding actions (while the returned object is alive)
friend class ActionLock;
ActionLock cancel() { return ActionLock(global_blocker); }
ActionLock cancelForPartition(const std::string & partition_id);
/// Cancel the actions forever.
void cancelForever() { global_blocker.cancelForever(); }
const std::atomic<int> & getCounter() const { return global_blocker.getCounter(); }
size_t countPartitionBlockers() const;
/// Cleanup partition blockers
void compactPartitionBlockers();
std::string formatDebug() const;
private:
void compactPartitionBlockersLocked();
bool isCancelledForPartitionOnlyLocked(const std::string & partition_id) const;
ActionBlocker global_blocker;
mutable std::shared_mutex mutex;
std::unordered_map<std::string, ActionBlocker> partition_blockers;
size_t cleanup_counter = 0;
};
}

View File

@ -0,0 +1,257 @@
#include <gtest/gtest.h>
#include <vector>
#include <thread>
#include <fmt/format.h>
#include <Storages/MergeTree/PartitionActionBlocker.h>
#include <iomanip>
using namespace DB;
TEST(PartitionActionBlocker, TestDefaultConstructor)
{
PartitionActionBlocker blocker;
EXPECT_FALSE(blocker.isCancelled());
// EXPECT_EQ(0, blocker.getCounter().load());
}
TEST(PartitionActionBlocker, TestCancelForever)
{
PartitionActionBlocker blocker;
blocker.cancelForever();
EXPECT_TRUE(blocker.isCancelled());
// EXPECT_EQ(1, blocker.getCounter().load());
}
TEST(PartitionActionBlocker, TestCancel)
{
PartitionActionBlocker blocker;
{
auto lock = blocker.cancel();
EXPECT_TRUE(blocker.isCancelled());
// EXPECT_EQ(1, blocker.getCounter().load());
}
// automatically un-cancelled on `lock` destruction
EXPECT_FALSE(blocker.isCancelled());
}
TEST(PartitionActionBlocker, TestCancelForPartition)
{
const std::string partition_id = "some partition id";
const std::string partition_id2 = "some other partition id";
PartitionActionBlocker blocker;
{
auto lock = blocker.cancelForPartition(partition_id);
// blocker is not locked fully
EXPECT_FALSE(blocker.isCancelled());
// EXPECT_EQ(0, blocker.getCounter().load());
// blocker reports that only partition is locked
EXPECT_TRUE(blocker.isCancelledForPartition(partition_id));
// doesn't affect other partitions
EXPECT_FALSE(blocker.isCancelledForPartition(partition_id2));
}
EXPECT_FALSE(blocker.isCancelled());
EXPECT_FALSE(blocker.isCancelledForPartition(partition_id));
}
TEST(PartitionActionBlocker, TestCancelForTwoPartitions)
{
const std::string partition_id1 = "some partition id";
const std::string partition_id2 = "some other partition id";
PartitionActionBlocker blocker;
{
auto lock1 = blocker.cancelForPartition(partition_id1);
auto lock2 = blocker.cancelForPartition(partition_id2);
// blocker is not locked fully
EXPECT_FALSE(blocker.isCancelled());
// EXPECT_EQ(0, blocker.getCounter().load());
// blocker reports that both partitions are locked
EXPECT_TRUE(blocker.isCancelledForPartition(partition_id1));
EXPECT_TRUE(blocker.isCancelledForPartition(partition_id2));
}
// blocker is not locked fully
EXPECT_FALSE(blocker.isCancelled());
// EXPECT_EQ(0, blocker.getCounter().load());
// blocker reports that only partition is locked
EXPECT_FALSE(blocker.isCancelledForPartition(partition_id1));
}
TEST(PartitionActionBlocker, TestCancelForSamePartitionTwice)
{
// Partition is unlocked only when all locks are destroyed.
const std::string partition_id = "some partition id";
const std::string partition_id2 = "some other partition id";
// Lock `partition_id` twice, make sure that global lock
// and other partitions are unaffected.
// Check that `partition_id` is unlocked only after both locks are destroyed.
PartitionActionBlocker blocker;
{
auto lock1 = blocker.cancelForPartition(partition_id);
{
auto lock2 = blocker.cancelForPartition(partition_id);
EXPECT_FALSE(blocker.isCancelled());
EXPECT_TRUE(blocker.isCancelledForPartition(partition_id));
EXPECT_FALSE(blocker.isCancelledForPartition(partition_id2));
}
EXPECT_FALSE(blocker.isCancelled());
EXPECT_TRUE(blocker.isCancelledForPartition(partition_id));
EXPECT_FALSE(blocker.isCancelledForPartition(partition_id2));
}
// All locks lifted
EXPECT_FALSE(blocker.isCancelled());
EXPECT_FALSE(blocker.isCancelledForPartition(partition_id));
}
TEST(PartitionActionBlocker, TestCancelAndThenCancelForPartition)
{
// Partition is unlocked only when all locks are destroyed.
const std::string partition_id = "some partition id";
const std::string partition_id2 = "some other partition id";
PartitionActionBlocker blocker;
{
auto global_lock = blocker.cancel();
{
auto lock1 = blocker.cancelForPartition(partition_id);
EXPECT_TRUE(blocker.isCancelled());
EXPECT_TRUE(blocker.isCancelledForPartition(partition_id));
EXPECT_TRUE(blocker.isCancelledForPartition(partition_id2));
}
EXPECT_TRUE(blocker.isCancelled());
EXPECT_TRUE(blocker.isCancelledForPartition(partition_id));
EXPECT_TRUE(blocker.isCancelledForPartition(partition_id2));
}
// All locks lifted
EXPECT_FALSE(blocker.isCancelled());
EXPECT_FALSE(blocker.isCancelledForPartition(partition_id));
EXPECT_FALSE(blocker.isCancelledForPartition(partition_id2));
}
TEST(PartitionActionBlocker, TestCancelForPartitionAndThenCancel)
{
// Partition is unlocked only when all locks are destroyed.
const std::string partition_id = "some partition id";
const std::string partition_id2 = "some other partition id";
PartitionActionBlocker blocker;
{
auto lock1 = blocker.cancelForPartition(partition_id);
{
auto global_lock = blocker.cancel();
EXPECT_TRUE(blocker.isCancelled());
EXPECT_TRUE(blocker.isCancelledForPartition(partition_id));
EXPECT_TRUE(blocker.isCancelledForPartition(partition_id2));
}
// global_locked is 'no more', so only 1 partition is locked now.
EXPECT_FALSE(blocker.isCancelled());
EXPECT_TRUE(blocker.isCancelledForPartition(partition_id));
EXPECT_FALSE(blocker.isCancelledForPartition(partition_id2));
}
// All locks lifted
EXPECT_FALSE(blocker.isCancelled());
EXPECT_FALSE(blocker.isCancelledForPartition(partition_id));
EXPECT_FALSE(blocker.isCancelledForPartition(partition_id2));
}
TEST(PartitionActionBlocker, TestAutomaticCompactPartitionBlockers)
{
const size_t partitions_count = 100;
const std::string partition_id = "some partition id";
PartitionActionBlocker blocker;
for (size_t i = 0; i < partitions_count; ++i)
{
blocker.cancelForPartition(partition_id + "_" + std::to_string(i));
}
// Automatic cleanup happens once in a while, 100 stale locks should trigger it.
EXPECT_LT(blocker.countPartitionBlockers(), partitions_count);
}
TEST(PartitionActionBlocker, TestCompactPartitionBlockers)
{
const size_t partitions_count = 100;
const std::string partition_id = "some partition id";
PartitionActionBlocker blocker;
for (size_t i = 0; i < partitions_count; ++i)
{
blocker.cancelForPartition(partition_id + "_" + std::to_string(i));
}
// Manually cleanup all stale blockers (all blockers in this case).
blocker.compactPartitionBlockers();
EXPECT_EQ(0, blocker.countPartitionBlockers());
}
TEST(PartitionActionBlocker, TestCompactPartitionBlockersDoesntRemoveActiveBlockers)
{
const size_t partitions_count = 100;
const std::string partition_id = "some partition id";
PartitionActionBlocker blocker;
auto lock_foo = blocker.cancelForPartition("FOO");
for (size_t i = 0; i < partitions_count; ++i)
{
blocker.cancelForPartition(partition_id + "_" + std::to_string(i));
}
auto lock_bar = blocker.cancelForPartition("BAR");
EXPECT_LT(2, blocker.countPartitionBlockers());
// Manually cleanup all stale blockers (all except held by lock_foo and lock_bar).
blocker.compactPartitionBlockers();
EXPECT_EQ(2, blocker.countPartitionBlockers());
}
TEST(PartitionActionBlocker, TestFormatDebug)
{
// Do not validate contents, just make sure that something is printed out
const size_t partitions_count = 100;
const std::string partition_id = "some partition id";
PartitionActionBlocker blocker;
auto global_lock = blocker.cancel();
auto lock_foo = blocker.cancelForPartition("FOO");
auto lock_foo2 = blocker.cancelForPartition("FOO");
for (size_t i = 0; i < partitions_count; ++i)
{
blocker.cancelForPartition(partition_id + "_" + std::to_string(i));
}
auto lock_bar = blocker.cancelForPartition("BAR");
EXPECT_NE("", blocker.formatDebug());
}

View File

@ -1145,7 +1145,7 @@ bool StorageMergeTree::merge(
{
std::unique_lock lock(currently_processing_in_background_mutex);
if (merger_mutator.merges_blocker.isCancelled())
if (merger_mutator.merges_blocker.isCancelledForPartition(partition_id))
throw Exception(ErrorCodes::ABORTED, "Cancelled merging parts");
merge_mutate_entry = selectPartsToMerge(
@ -1413,8 +1413,19 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
has_mutations = !current_mutations_by_version.empty();
}
auto isCancelled = [&merges_blocker = merger_mutator.merges_blocker](const MergeMutateSelectedEntryPtr & entry)
{
if (entry->future_part)
return merges_blocker.isCancelledForPartition(entry->future_part->part_info.partition_id);
return merges_blocker.isCancelled();
};
if (merge_entry)
{
if (isCancelled(merge_entry))
return false;
auto task = std::make_shared<MergePlainMergeTreeTask>(*this, metadata_snapshot, /* deduplicate */ false, Names{}, /* cleanup */ false, merge_entry, shared_lock, common_assignee_trigger);
task->setCurrentTransaction(std::move(transaction_for_merge), std::move(txn));
bool scheduled = assignee.scheduleMergeMutateTask(task);
@ -1426,6 +1437,9 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
}
if (mutate_entry)
{
if (isCancelled(mutate_entry))
return false;
/// We take new metadata snapshot here. It's because mutation commands can be executed only with metadata snapshot
/// which is equal or more fresh than commands themselves. In extremely rare case it can happen that we will have alter
/// in between we took snapshot above and selected commands. That is why we take new snapshot here.
@ -1645,6 +1659,66 @@ bool StorageMergeTree::optimize(
return true;
}
namespace
{
size_t countOccurrences(const StorageMergeTree::DataParts & haystack, const DataPartsVector & needle)
{
size_t total = 0;
for (const auto & n : needle)
total += haystack.count(n);
return total;
}
auto getNameWithState(const auto & parts)
{
return std::views::transform(parts, [](const auto & p)
{
return p->getNameWithState();
});
}
}
// Same as stopMergesAndWait, but waits only for merges on parts belonging to a certain partition.
ActionLock StorageMergeTree::stopMergesAndWaitForPartition(String partition_id)
{
LOG_DEBUG(log, "StorageMergeTree::stopMergesAndWaitForPartition partition_id: `{}`", partition_id);
/// Stop all merges and prevent new from starting, BUT unlike stopMergesAndWait(), only wait for the merges on small set of parts to finish.
std::unique_lock lock(currently_processing_in_background_mutex);
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger_mutator.merges_blocker.cancelForPartition(partition_id);
const DataPartsVector parts_to_wait = getDataPartsVectorInPartitionForInternalUsage(MergeTreeDataPartState::Active, partition_id);
LOG_TRACE(log, "StorageMergeTree::stopMergesAndWaitForPartition parts to wait: {} ({} items)",
fmt::join(getNameWithState(parts_to_wait), ", "), parts_to_wait.size());
LOG_DEBUG(log, "StorageMergeTree::stopMergesAndWaitForPartition all mutating parts: {} ({} items)",
fmt::join(getNameWithState(currently_merging_mutating_parts), ", "), currently_merging_mutating_parts.size());
// TODO allow to stop merges in specific partition only (like it's done in ReplicatedMergeTree)
while (size_t still_merging = countOccurrences(currently_merging_mutating_parts, parts_to_wait))
{
LOG_DEBUG(log, "StorageMergeTree::stopMergesAndWaitForPartition Waiting for currently running merges ({} {} parts are merging right now)",
fmt::join(getNameWithState(currently_merging_mutating_parts), ", "), still_merging);
if (std::cv_status::timeout == currently_processing_in_background_condition.wait_for(
lock, std::chrono::seconds(DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC)))
{
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Timeout while waiting for already running merges");
}
}
LOG_DEBUG(log, "StorageMergeTree::stopMergesAndWaitForPartition done waiting, still merging {} ({} items)",
fmt::join(getNameWithState(currently_merging_mutating_parts), ", "), currently_merging_mutating_parts.size());
return merge_blocker;
}
ActionLock StorageMergeTree::stopMergesAndWait()
{
/// TODO allow to stop merges in specific partition only (like it's done in ReplicatedMergeTree)
@ -2079,10 +2153,14 @@ PartitionCommandsResultInfo StorageMergeTree::attachPartition(
void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, ContextPtr local_context)
{
assertNotReadonly();
LOG_DEBUG(log, "StorageMergeTree::replacePartitionFrom\tsource_table: {}, replace: {}", source_table->getStorageID().getShortName(), replace);
auto lock1 = lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
auto lock2 = source_table->lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
auto merges_blocker = stopMergesAndWait();
const String partition_id = getPartitionIDFromQuery(partition, local_context);
auto merges_blocker = stopMergesAndWaitForPartition(partition_id);
auto source_metadata_snapshot = source_table->getInMemoryMetadataPtr();
auto my_metadata_snapshot = getInMemoryMetadataPtr();
@ -2091,7 +2169,7 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, my_metadata_snapshot);
DataPartsVector src_parts;
String partition_id;
bool is_all = partition->as<ASTPartition>()->all;
if (is_all)
{

View File

@ -189,6 +189,7 @@ private:
/// If not force, then take merges selector and check that part is not participating in background operations.
MergeTreeDataPartPtr outdatePart(MergeTreeTransaction * txn, const String & part_name, bool force, bool clear_without_timeout = true);
ActionLock stopMergesAndWait();
ActionLock stopMergesAndWaitForPartition(String partition_id);
/// Allocate block number for new mutation, write mutation to disk
/// and into in-memory structures. Wake up merge-mutation task.

View File

@ -0,0 +1,33 @@
-- https://github.com/ClickHouse/ClickHouse/issues/45328
-- Check that replacing one partition on a table with `ALTER TABLE REPLACE PARTITION`
-- doesn't wait for mutations on other partitions.
-- { echo }
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
CREATE TABLE t1
(
`p` UInt8,
`i` UInt64
)
ENGINE = MergeTree
PARTITION BY p
ORDER BY tuple();
INSERT INTO t1 VALUES (1, 1), (2, 2);
CREATE TABLE t2 AS t1;
INSERT INTO t2 VALUES (2, 2000);
-- mutation that is supposed to be running in bg while REPLACE is performed.
-- sleep(1) is arbitrary here, we just need mutation to take long enough to be noticeable.
ALTER TABLE t1 UPDATE i = if(sleep(1), 0, 9000) IN PARTITION id '1' WHERE p == 1;
-- check that mutation is started
SELECT not(is_done) as is_running FROM system.mutations WHERE database==currentDatabase() AND table=='t1';
1
ALTER TABLE t1 REPLACE PARTITION id '2' FROM t2;
-- check that mutation is still running
SELECT is_done, latest_fail_reason, parts_to_do FROM system.mutations WHERE database==currentDatabase() AND table=='t1';
0 1
-- Expecting that mutation hasn't finished yet (since ALTER TABLE .. REPLACE wasn't waiting for it),
-- so row with `p == 1` still has the old value of `i == 1`.
SELECT * FROM t1 ORDER BY p;
1 1
2 2000

View File

@ -0,0 +1,39 @@
-- https://github.com/ClickHouse/ClickHouse/issues/45328
-- Check that replacing one partition on a table with `ALTER TABLE REPLACE PARTITION`
-- doesn't wait for mutations on other partitions.
-- { echo }
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
CREATE TABLE t1
(
`p` UInt8,
`i` UInt64
)
ENGINE = MergeTree
PARTITION BY p
ORDER BY tuple();
INSERT INTO t1 VALUES (1, 1), (2, 2);
CREATE TABLE t2 AS t1;
INSERT INTO t2 VALUES (2, 2000);
-- mutation that is supposed to be running in background while REPLACE is performed.
-- sleep(1) is arbitrary here, we just need mutation to take long enough to be noticeable.
ALTER TABLE t1 UPDATE i = if(sleep(1), 0, 9000) IN PARTITION id '1' WHERE p == 1;
-- check that mutation is started
SELECT not(is_done) as is_running FROM system.mutations WHERE database==currentDatabase() AND table=='t1';
ALTER TABLE t1 REPLACE PARTITION id '2' FROM t2;
-- check that mutation is still running
SELECT is_done, latest_fail_reason, parts_to_do FROM system.mutations WHERE database==currentDatabase() AND table=='t1';
-- Expecting that mutation hasn't finished yet (since ALTER TABLE .. REPLACE wasn't waiting for it),
-- so row with `p == 1` still has the old value of `i == 1`.
SELECT * FROM t1 ORDER BY p;
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;