From b24e415b0abfafe30a86bb9554fa9693efae59ec Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Wed, 15 Mar 2023 12:15:57 +0100 Subject: [PATCH 01/17] Using PartitionActionBlocker to block merges on certain partitions + some tests --- src/Common/tests/gtest_action_blocker.cpp | 129 ++++++ src/Storages/MergeTree/DataPartsExchange.h | 1 + src/Storages/MergeTree/MergeTask.cpp | 31 +- src/Storages/MergeTree/MergeTask.h | 10 +- .../MergeTree/MergeTreeDataMergerMutator.h | 2 +- src/Storages/MergeTree/MutateTask.cpp | 23 +- src/Storages/MergeTree/MutateTask.h | 3 +- .../MergeTree/PartitionActionBlocker.cpp | 85 ++++ .../MergeTree/PartitionActionBlocker.h | 68 +++ .../tests/gtest_partition_action_blocker.cpp | 405 ++++++++++++++++++ src/Storages/StorageMergeTree.cpp | 76 +++- src/Storages/StorageMergeTree.h | 1 + ...t_merges_on_unrelated_partitions.reference | 49 +++ ...not_wait_merges_on_unrelated_partitions.sh | 85 ++++ ...utations_on_unrelated_partitions.reference | 33 ++ ...wait_mutations_on_unrelated_partitions.sql | 36 ++ 16 files changed, 1013 insertions(+), 24 deletions(-) create mode 100644 src/Common/tests/gtest_action_blocker.cpp create mode 100644 src/Storages/MergeTree/PartitionActionBlocker.cpp create mode 100644 src/Storages/MergeTree/PartitionActionBlocker.h create mode 100644 src/Storages/MergeTree/tests/gtest_partition_action_blocker.cpp create mode 100644 tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_merges_on_unrelated_partitions.reference create mode 100755 tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_merges_on_unrelated_partitions.sh create mode 100644 tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_mutations_on_unrelated_partitions.reference create mode 100644 tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_mutations_on_unrelated_partitions.sql diff --git a/src/Common/tests/gtest_action_blocker.cpp b/src/Common/tests/gtest_action_blocker.cpp new file mode 100644 index 00000000000..687f77da679 --- /dev/null +++ b/src/Common/tests/gtest_action_blocker.cpp @@ -0,0 +1,129 @@ +#include + +#include +#include +#include +#include +#include +#include + +#include +#include + +using namespace DB; + +TEST(ActionBlocker, TestDefaultConstructor) +{ + ActionBlocker blocker; + + EXPECT_FALSE(blocker.isCancelled()); + EXPECT_EQ(0, blocker.getCounter().load()); +} + +TEST(ActionBlocker, TestCancelForever) +{ + ActionBlocker blocker; + + blocker.cancelForever(); + EXPECT_TRUE(blocker.isCancelled()); + EXPECT_EQ(1, blocker.getCounter().load()); +} + +TEST(ActionBlocker, TestCancel) +{ + const std::string partition_id = "some partition id"; + ActionBlocker blocker; + + { + auto lock = blocker.cancel(); + EXPECT_TRUE(blocker.isCancelled()); + EXPECT_EQ(1, blocker.getCounter().load()); + } + // automatically un-cancelled on `lock` destruction + EXPECT_FALSE(blocker.isCancelled()); +} + + + +TEST(ActionLock, TestDefaultConstructor) +{ + ActionLock locker; + EXPECT_TRUE(locker.expired()); +} + +TEST(ActionLock, TestConstructorWithActionBlocker) +{ + ActionBlocker blocker; + ActionLock lock(blocker); + + EXPECT_FALSE(lock.expired()); + EXPECT_TRUE(blocker.isCancelled()); + EXPECT_EQ(1, blocker.getCounter().load()); +} + +TEST(ActionLock, TestMoveAssignmentToEmpty) +{ + ActionBlocker blocker; + + { + ActionLock lock; + lock = blocker.cancel(); + EXPECT_TRUE(blocker.isCancelled()); + } + // automatically un-cancelled on `lock` destruction + EXPECT_FALSE(blocker.isCancelled()); + EXPECT_EQ(0, blocker.getCounter().load()); +} + +TEST(ActionLock, TestMoveAssignmentToNonEmpty) +{ + ActionBlocker blocker; + { + auto lock = blocker.cancel(); + EXPECT_TRUE(blocker.isCancelled()); + + // cause a move + lock = blocker.cancel(); + + // blocker should remain locked + EXPECT_TRUE(blocker.isCancelled()); + } + // automatically un-cancelled on `lock` destruction + EXPECT_FALSE(blocker.isCancelled()); + EXPECT_EQ(0, blocker.getCounter().load()); +} + +TEST(ActionLock, TestMoveAssignmentToNonEmpty2) +{ + ActionBlocker blocker1; + ActionBlocker blocker2; + { + auto lock = blocker1.cancel(); + EXPECT_TRUE(blocker1.isCancelled()); + + // cause a move + lock = blocker2.cancel(); + + // blocker2 be remain locked, blocker1 - unlocked + EXPECT_TRUE(blocker2.isCancelled()); + + EXPECT_FALSE(blocker1.isCancelled()); + } + // automatically un-cancelled on `lock` destruction + EXPECT_FALSE(blocker1.isCancelled()); + EXPECT_FALSE(blocker2.isCancelled()); + EXPECT_EQ(0, blocker1.getCounter().load()); + EXPECT_EQ(0, blocker2.getCounter().load()); +} + +TEST(ActionLock, TestExpiration) +{ + ActionLock lock; + { + ActionBlocker blocker; + lock = blocker.cancel(); + EXPECT_FALSE(lock.expired()); + } + + EXPECT_TRUE(lock.expired()); +} diff --git a/src/Storages/MergeTree/DataPartsExchange.h b/src/Storages/MergeTree/DataPartsExchange.h index 8c15dc3cfdb..d0b86f9a5ee 100644 --- a/src/Storages/MergeTree/DataPartsExchange.h +++ b/src/Storages/MergeTree/DataPartsExchange.h @@ -8,6 +8,7 @@ #include #include #include +#include namespace zkutil diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index aa38198334e..719a2d97c9a 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -128,6 +128,20 @@ static void addMissedColumnsToSerializationInfos( } } +bool MergeTask::GlobalRuntimeContext::isCancelled() const +{ + return merges_blocker->isCancelledForPartition(future_part->part_info.partition_id) + || merge_list_element_ptr->is_cancelled.load(std::memory_order_relaxed); +} + +void MergeTask::GlobalRuntimeContext::checkOperationIsNotCanceled() const +{ + if (isCancelled()) + { + throw Exception(ErrorCodes::ABORTED, "Cancelled merging parts"); + } +} + bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() { @@ -140,8 +154,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() } const String local_tmp_suffix = global_ctx->parent_part ? ctx->suffix : ""; - if (global_ctx->merges_blocker->isCancelled() || global_ctx->merge_list_element_ptr->is_cancelled.load(std::memory_order_relaxed)) - throw Exception(ErrorCodes::ABORTED, "Cancelled merging parts"); + global_ctx->checkOperationIsNotCanceled(); /// We don't want to perform merge assigned with TTL as normal merge, so /// throw exception @@ -391,9 +404,11 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() ctx->is_cancelled = [merges_blocker = global_ctx->merges_blocker, ttl_merges_blocker = global_ctx->ttl_merges_blocker, need_remove = ctx->need_remove_expired_values, - merge_list_element = global_ctx->merge_list_element_ptr]() -> bool + merge_list_element = global_ctx->merge_list_element_ptr, + partition_id = global_ctx->future_part->part_info.partition_id]() -> bool { - return merges_blocker->isCancelled() + // TODO(vnemkov): we might want capture global_ctx here and use it's `isCancelled` method, but I'm not sure about object lifetimes. + return merges_blocker->isCancelledForPartition(partition_id) || (need_remove && ttl_merges_blocker->isCancelled()) || merge_list_element->is_cancelled.load(std::memory_order_relaxed); }; @@ -478,8 +493,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() global_ctx->merging_executor.reset(); global_ctx->merged_pipeline.reset(); - if (global_ctx->merges_blocker->isCancelled() || global_ctx->merge_list_element_ptr->is_cancelled.load(std::memory_order_relaxed)) - throw Exception(ErrorCodes::ABORTED, "Cancelled merging parts"); + global_ctx->checkOperationIsNotCanceled(); if (ctx->need_remove_expired_values && global_ctx->ttl_merges_blocker->isCancelled()) throw Exception(ErrorCodes::ABORTED, "Cancelled merging parts with expired TTL"); @@ -634,7 +648,7 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const bool MergeTask::VerticalMergeStage::executeVerticalMergeForOneColumn() const { Block block; - if (!global_ctx->merges_blocker->isCancelled() && !global_ctx->merge_list_element_ptr->is_cancelled.load(std::memory_order_relaxed) + if (!global_ctx->isCancelled() && ctx->executor->pull(block)) { ctx->column_elems_written += block.rows(); @@ -650,8 +664,7 @@ bool MergeTask::VerticalMergeStage::executeVerticalMergeForOneColumn() const void MergeTask::VerticalMergeStage::finalizeVerticalMergeForOneColumn() const { const String & column_name = ctx->it_name_and_type->name; - if (global_ctx->merges_blocker->isCancelled() || global_ctx->merge_list_element_ptr->is_cancelled.load(std::memory_order_relaxed)) - throw Exception(ErrorCodes::ABORTED, "Cancelled merging parts"); + global_ctx->checkOperationIsNotCanceled(); ctx->executor.reset(); auto changed_checksums = ctx->column_to->fillChecksums(global_ctx->new_data_part, global_ctx->checksums_gathered_columns); diff --git a/src/Storages/MergeTree/MergeTask.h b/src/Storages/MergeTree/MergeTask.h index 1f50e55f8a0..fcf985ff7db 100644 --- a/src/Storages/MergeTree/MergeTask.h +++ b/src/Storages/MergeTree/MergeTask.h @@ -24,7 +24,7 @@ #include #include #include - +#include namespace DB { @@ -75,7 +75,7 @@ public: MergeTreeTransactionPtr txn, MergeTreeData * data_, MergeTreeDataMergerMutator * mutator_, - ActionBlocker * merges_blocker_, + PartitionActionBlocker * merges_blocker_, ActionBlocker * ttl_merges_blocker_) { global_ctx = std::make_shared(); @@ -148,7 +148,7 @@ private: MergeListElement * merge_list_element_ptr{nullptr}; MergeTreeData * data{nullptr}; MergeTreeDataMergerMutator * mutator{nullptr}; - ActionBlocker * merges_blocker{nullptr}; + PartitionActionBlocker * merges_blocker{nullptr}; ActionBlocker * ttl_merges_blocker{nullptr}; StorageSnapshotPtr storage_snapshot{nullptr}; StorageMetadataPtr metadata_snapshot{nullptr}; @@ -195,6 +195,10 @@ private: bool need_prefix; scope_guard temporary_directory_lock; + + // will throw an exception if merge was cancelled in any way. + void checkOperationIsNotCanceled() const; + bool isCancelled() const; }; using GlobalRuntimeContextPtr = std::shared_ptr; diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index f3a3f51b6c3..fd399551424 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -207,7 +207,7 @@ public : /** Is used to cancel all merges and mutations. On cancel() call all currently running actions will throw exception soon. * All new attempts to start a merge or mutation will throw an exception until all 'LockHolder' objects will be destroyed. */ - ActionBlocker merges_blocker; + PartitionActionBlocker merges_blocker; ActionBlocker ttl_merges_blocker; private: diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index a5b8a2a2a6d..cd363b820b7 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -934,7 +934,7 @@ struct MutationContext { MergeTreeData * data; MergeTreeDataMergerMutator * mutator; - ActionBlocker * merges_blocker; + PartitionActionBlocker * merges_blocker; TableLockHolder * holder; MergeListEntry * mutate_entry; @@ -997,11 +997,22 @@ struct MutationContext bool need_prefix = true; scope_guard temporary_directory_lock; + + bool checkOperationIsNotCanceled() const + { + if (merges_blocker->isCancelledForPartition(new_data_part->info.partition_id) + || (*mutate_entry)->is_cancelled) + { + throw Exception(ErrorCodes::ABORTED, "Cancelled mutating parts"); + } + + return true; + } + }; using MutationContextPtr = std::shared_ptr; - class MergeProjectionPartsTask : public IExecutableTask { public: @@ -1250,7 +1261,7 @@ void PartMergerWriter::prepare() bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() { Block cur_block; - if (MutationHelpers::checkOperationIsNotCanceled(*ctx->merges_blocker, ctx->mutate_entry) && ctx->mutating_executor->pull(cur_block)) + if (ctx->checkOperationIsNotCanceled() && ctx->mutating_executor->pull(cur_block)) { if (ctx->minmax_idx) ctx->minmax_idx->update(cur_block, ctx->data->getMinMaxColumnsNames(ctx->metadata_snapshot->getPartitionKey())); @@ -1925,7 +1936,7 @@ MutateTask::MutateTask( const MergeTreeTransactionPtr & txn, MergeTreeData & data_, MergeTreeDataMergerMutator & mutator_, - ActionBlocker & merges_blocker_, + PartitionActionBlocker & merges_blocker_, bool need_prefix_) : ctx(std::make_shared()) { @@ -1964,7 +1975,7 @@ bool MutateTask::execute() } case State::NEED_EXECUTE: { - MutationHelpers::checkOperationIsNotCanceled(*ctx->merges_blocker, ctx->mutate_entry); + ctx->checkOperationIsNotCanceled(); if (task->executeStep()) return true; @@ -2047,7 +2058,7 @@ static bool canSkipMutationCommandForPart(const MergeTreeDataPartPtr & part, con bool MutateTask::prepare() { - MutationHelpers::checkOperationIsNotCanceled(*ctx->merges_blocker, ctx->mutate_entry); + ctx->checkOperationIsNotCanceled(); if (ctx->future_part->parts.size() != 1) throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to mutate {} parts, not one. " diff --git a/src/Storages/MergeTree/MutateTask.h b/src/Storages/MergeTree/MutateTask.h index dc21df018d7..adc5d869d7b 100644 --- a/src/Storages/MergeTree/MutateTask.h +++ b/src/Storages/MergeTree/MutateTask.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -35,7 +36,7 @@ public: const MergeTreeTransactionPtr & txn, MergeTreeData & data_, MergeTreeDataMergerMutator & mutator_, - ActionBlocker & merges_blocker_, + PartitionActionBlocker & merges_blocker_, bool need_prefix_); bool execute(); diff --git a/src/Storages/MergeTree/PartitionActionBlocker.cpp b/src/Storages/MergeTree/PartitionActionBlocker.cpp new file mode 100644 index 00000000000..5922be8d3f7 --- /dev/null +++ b/src/Storages/MergeTree/PartitionActionBlocker.cpp @@ -0,0 +1,85 @@ +#include + +#include +#include +#include + +namespace DB +{ + +ActionLock PartitionActionBlocker::cancelForPartition(const std::string & partition_id) +{ + std::unique_lock lock(mutex); + const size_t prev_size = partition_blockers.size(); + + ActionLock result = partition_blockers[partition_id].cancel(); + + // Cleanup stale `ActionBlocker` instances once in a while, to prevent unbound growth. + if (prev_size != partition_blockers.size() && ++cleanup_counter > 32) // 32 is arbitrary. + compactPartitionBlockersLocked(); + + return result; +} + +bool PartitionActionBlocker::isCancelledForPartition(const std::string & partition_id) const +{ + if (isCancelled()) + return true; + + std::shared_lock lock(mutex); + return isCancelledForPartitionOnlyLocked(partition_id); +} + +bool PartitionActionBlocker::isCancelledForPartitionOnlyLocked(const std::string & partition_id) const +{ + auto p = partition_blockers.find(partition_id); + return p != partition_blockers.end() && p->second.isCancelled(); +} + +size_t PartitionActionBlocker::countPartitionBlockers() const +{ + std::shared_lock lock(mutex); + return partition_blockers.size(); +} + +void PartitionActionBlocker::compactPartitionBlockers() +{ + std::unique_lock lock(mutex); + + compactPartitionBlockersLocked(); +} + +void PartitionActionBlocker::compactPartitionBlockersLocked() +{ + std::erase_if(partition_blockers, [](const auto & p) + { + return !p.second.isCancelled(); + }); + cleanup_counter = 0; +} + +std::string PartitionActionBlocker::formatDebug() const +{ + std::shared_lock lock(mutex); + + WriteBufferFromOwnString out; + out << "Global lock: " << global_blocker.getCounter().load() + << "\n" + << partition_blockers.size() << " live partition locks: {"; + + size_t i = 0; + for (const auto & p : partition_blockers) + { + out << "\n\t" << DB::quote << p.first << " : " << p.second.getCounter().load(); + + if (++i < partition_blockers.size()) + out << ","; + else + out << "\n"; + } + out << "}"; + + return out.str(); +} + +} diff --git a/src/Storages/MergeTree/PartitionActionBlocker.h b/src/Storages/MergeTree/PartitionActionBlocker.h new file mode 100644 index 00000000000..5dd4e983c7a --- /dev/null +++ b/src/Storages/MergeTree/PartitionActionBlocker.h @@ -0,0 +1,68 @@ +#pragma once + +#include +#include +#include +#include + +#include +#include + +namespace DB +{ + +/// Locks actions on given MergeTree-family table. +/// Like, ActionBlocker, but has two modes: +/// - 'partition-specific' lock, @see `cancelForPartition()`, and `isCancelledForPartition()`. +/// Only actions on *specific partition* are prohibited to start +/// and must interrupt at firsts chance. +/// There could be arbitrary number of partitions locked simultaneously. +/// +/// - 'global' lock, @see `cancel()`, `isCancelled()`, and `cancelForever()` +/// Any new actions for *ALL* partitions are prohibited to start +/// and are required to interrupt at first possible moment. +/// As if all individual partitions were locked with 'partition-specific lock'. +/// +/// Any lock can be set multiple times and considered fully un-locked +/// only when it was un-locked same number of times (by destructing/resetting of `ActionLock`). +/// +/// There could be any number of locks active at given point on single ActionBlocker instance: +/// - 'global lock' locked `N` times. +/// - `M` 'partition lock's each locked different number of times. +class PartitionActionBlocker +{ +public: + PartitionActionBlocker() = default; + + bool isCancelled() const { return global_blocker.isCancelled(); } + bool isCancelledForPartition(const std::string & /*partition_id*/) const; + + /// Temporarily blocks corresponding actions (while the returned object is alive) + friend class ActionLock; + ActionLock cancel() { return ActionLock(global_blocker); } + ActionLock cancelForPartition(const std::string & partition_id); + + /// Cancel the actions forever. + void cancelForever() { global_blocker.cancelForever(); } + + const std::atomic & getCounter() const { return global_blocker.getCounter(); } + + size_t countPartitionBlockers() const; + void compactPartitionBlockers(); + + std::string formatDebug() const; + +private: + void compactPartitionBlockersLocked(); + bool isCancelledForPartitionOnlyLocked(const std::string & partition_id) const; + +private: + ActionBlocker global_blocker; + + mutable std::shared_mutex mutex; + std::unordered_map partition_blockers; + size_t cleanup_counter = 0; +}; + + +} diff --git a/src/Storages/MergeTree/tests/gtest_partition_action_blocker.cpp b/src/Storages/MergeTree/tests/gtest_partition_action_blocker.cpp new file mode 100644 index 00000000000..e3e93a0ef80 --- /dev/null +++ b/src/Storages/MergeTree/tests/gtest_partition_action_blocker.cpp @@ -0,0 +1,405 @@ +#include + +#include +#include +#include + +#include + +using namespace DB; + +TEST(PartitionActionBlocker, TestDefaultConstructor) +{ + PartitionActionBlocker blocker; + + EXPECT_FALSE(blocker.isCancelled()); + // EXPECT_EQ(0, blocker.getCounter().load()); +} + +TEST(PartitionActionBlocker, TestCancelForever) +{ + PartitionActionBlocker blocker; + + blocker.cancelForever(); + EXPECT_TRUE(blocker.isCancelled()); + // EXPECT_EQ(1, blocker.getCounter().load()); +} + +TEST(PartitionActionBlocker, TestCancel) +{ + const std::string partition_id = "some partition id"; + PartitionActionBlocker blocker; + + { + auto lock = blocker.cancel(); + EXPECT_TRUE(blocker.isCancelled()); + // EXPECT_EQ(1, blocker.getCounter().load()); + } + // automatically un-cancelled on `lock` destruction + EXPECT_FALSE(blocker.isCancelled()); +} + +TEST(PartitionActionBlocker, TestCancelForPartition) +{ + const std::string partition_id = "some partition id"; + const std::string partition_id2 = "some other partition id"; + PartitionActionBlocker blocker; + + { + auto lock = blocker.cancelForPartition(partition_id); + + // blocker is not locked fully + EXPECT_FALSE(blocker.isCancelled()); + // EXPECT_EQ(0, blocker.getCounter().load()); + + // blocker reports that only partition is locked + EXPECT_TRUE(blocker.isCancelledForPartition(partition_id)); + + // doesn't affect other partitions + EXPECT_FALSE(blocker.isCancelledForPartition(partition_id2)); + } + + EXPECT_FALSE(blocker.isCancelled()); + EXPECT_FALSE(blocker.isCancelledForPartition(partition_id)); +} + +TEST(PartitionActionBlocker, TestCancelForTwoPartitions) +{ + const std::string partition_id1 = "some partition id"; + const std::string partition_id2 = "some other partition id"; + PartitionActionBlocker blocker; + + { + auto lock1 = blocker.cancelForPartition(partition_id1); + auto lock2 = blocker.cancelForPartition(partition_id2); + + // blocker is not locked fully + EXPECT_FALSE(blocker.isCancelled()); + // EXPECT_EQ(0, blocker.getCounter().load()); + + // blocker reports that both partitions are locked + EXPECT_TRUE(blocker.isCancelledForPartition(partition_id1)); + EXPECT_TRUE(blocker.isCancelledForPartition(partition_id2)); + } + + // blocker is not locked fully + EXPECT_FALSE(blocker.isCancelled()); + // EXPECT_EQ(0, blocker.getCounter().load()); + + // blocker reports that only partition is locked + EXPECT_FALSE(blocker.isCancelledForPartition(partition_id1)); +} + +TEST(PartitionActionBlocker, TestCancelForSamePartitionTwice) +{ + // Partition is unlocked only when all locks are destroyed. + + const std::string partition_id = "some partition id"; + const std::string partition_id2 = "some other partition id"; + + // Lock `partition_id` twice, make sure that global lock + // and other partitions are unaffected. + // Check that `partition_id` is unlocked only after both locks are destroyed. + + PartitionActionBlocker blocker; + + { + auto lock1 = blocker.cancelForPartition(partition_id); + { + auto lock2 = blocker.cancelForPartition(partition_id); + + EXPECT_FALSE(blocker.isCancelled()); + EXPECT_TRUE(blocker.isCancelledForPartition(partition_id)); + EXPECT_FALSE(blocker.isCancelledForPartition(partition_id2)); + } + + EXPECT_FALSE(blocker.isCancelled()); + EXPECT_TRUE(blocker.isCancelledForPartition(partition_id)); + EXPECT_FALSE(blocker.isCancelledForPartition(partition_id2)); + } + // All locks lifted + + EXPECT_FALSE(blocker.isCancelled()); + EXPECT_FALSE(blocker.isCancelledForPartition(partition_id)); +} + +TEST(PartitionActionBlocker, TestCancelAndThenCancelForPartition) +{ + // Partition is unlocked only when all locks are destroyed. + + const std::string partition_id = "some partition id"; + const std::string partition_id2 = "some other partition id"; + PartitionActionBlocker blocker; + + { + auto global_lock = blocker.cancel(); + + { + auto lock1 = blocker.cancelForPartition(partition_id); + EXPECT_TRUE(blocker.isCancelled()); + EXPECT_TRUE(blocker.isCancelledForPartition(partition_id)); + EXPECT_TRUE(blocker.isCancelledForPartition(partition_id2)); + } + + EXPECT_TRUE(blocker.isCancelled()); + EXPECT_TRUE(blocker.isCancelledForPartition(partition_id)); + EXPECT_TRUE(blocker.isCancelledForPartition(partition_id2)); + } + // All locks lifted + + EXPECT_FALSE(blocker.isCancelled()); + EXPECT_FALSE(blocker.isCancelledForPartition(partition_id)); + EXPECT_FALSE(blocker.isCancelledForPartition(partition_id2)); +} + + +TEST(PartitionActionBlocker, TestCancelForPartitionAndThenCancel) +{ + // Partition is unlocked only when all locks are destroyed. + + const std::string partition_id = "some partition id"; + const std::string partition_id2 = "some other partition id"; + PartitionActionBlocker blocker; + + { + auto lock1 = blocker.cancelForPartition(partition_id); + { + auto global_lock = blocker.cancel(); + + EXPECT_TRUE(blocker.isCancelled()); + EXPECT_TRUE(blocker.isCancelledForPartition(partition_id)); + EXPECT_TRUE(blocker.isCancelledForPartition(partition_id2)); + } + + // global_locked is 'no more', so only 1 partition is locked now. + EXPECT_FALSE(blocker.isCancelled()); + EXPECT_TRUE(blocker.isCancelledForPartition(partition_id)); + EXPECT_FALSE(blocker.isCancelledForPartition(partition_id2)); + } + // All locks lifted + + EXPECT_FALSE(blocker.isCancelled()); + EXPECT_FALSE(blocker.isCancelledForPartition(partition_id)); + EXPECT_FALSE(blocker.isCancelledForPartition(partition_id2)); +} + +TEST(PartitionActionBlocker, TestAutomaticCompactPartitionBlockers) +{ + const size_t partitions_count = 100; + const std::string partition_id = "some partition id"; + PartitionActionBlocker blocker; + + for (size_t i = 0; i < partitions_count; ++i) + { + blocker.cancelForPartition(partition_id + "_" + std::to_string(i)); + } + + // Automatic cleanup happens once in a while, 100 stale locks should trigger it. + EXPECT_LT(blocker.countPartitionBlockers(), partitions_count); +} + +TEST(PartitionActionBlocker, TestCompactPartitionBlockers) +{ + const size_t partitions_count = 100; + const std::string partition_id = "some partition id"; + PartitionActionBlocker blocker; + + for (size_t i = 0; i < partitions_count; ++i) + { + blocker.cancelForPartition(partition_id + "_" + std::to_string(i)); + } + // Manually cleanup all stale blockers (all blockers in this case). + blocker.compactPartitionBlockers(); + + EXPECT_EQ(0, blocker.countPartitionBlockers()); +} + +TEST(PartitionActionBlocker, TestCompactPartitionBlockersDoesntRemoveActiveBlockers) +{ + const size_t partitions_count = 100; + const std::string partition_id = "some partition id"; + PartitionActionBlocker blocker; + + auto lock_foo = blocker.cancelForPartition("FOO"); + for (size_t i = 0; i < partitions_count; ++i) + { + blocker.cancelForPartition(partition_id + "_" + std::to_string(i)); + } + auto lock_bar = blocker.cancelForPartition("BAR"); + + EXPECT_LT(2, blocker.countPartitionBlockers()); + + // Manually cleanup all stale blockers (all except held by lock_foo and lock_bar). + blocker.compactPartitionBlockers(); + + EXPECT_EQ(2, blocker.countPartitionBlockers()); +} + +TEST(PartitionActionBlocker, TestFormatDebug) +{ + // Do not validate contents, just make sure that something is printed out + + const size_t partitions_count = 100; + const std::string partition_id = "some partition id"; + PartitionActionBlocker blocker; + + auto global_lock = blocker.cancel(); + auto lock_foo = blocker.cancelForPartition("FOO"); + auto lock_foo2 = blocker.cancelForPartition("FOO"); + for (size_t i = 0; i < partitions_count; ++i) + { + blocker.cancelForPartition(partition_id + "_" + std::to_string(i)); + } + auto lock_bar = blocker.cancelForPartition("BAR"); + + EXPECT_NE("", blocker.formatDebug()); +} + +namespace std { +template +inline ostream & operator<<(ostream & ostr, const chrono::duration & d) { + + const auto seconds = d.count() * 1.0 * P::num / P::den; + return ostr << std::fixed << std::setprecision(1) << seconds << "s"; +} +} + +template <> +struct fmt::formatter : public fmt::formatter +{ + auto format(const std::thread::id & id, fmt::format_context& ctx) const + { + std::stringstream sstr; + sstr << id; + + return fmt::formatter::format(sstr.str(), ctx); + } +}; + +template +struct fmt::formatter> : public fmt::formatter +{ + auto format(const std::chrono::duration & duration, fmt::format_context& ctx) const + { + std::stringstream sstr; + sstr << duration; + + return fmt::formatter::format(sstr.str(), ctx); + } +}; + +// TODO: remove this before merging +TEST(PartitionActionBlocker, DISABLED_TestMultiThreadLocks) +{ + // Do not validate contents, just make sure that nothing crashes. + + const size_t WORKERS_COUNT = 100; + const size_t DEFAULT_ITERATIONS = 100'000; + const auto DEFAULT_DELAY = std::chrono::microseconds{100}; + + std::vector workers; + PartitionActionBlocker blocker; +//#define LOG_PRINT(...) (void)(0) +#define LOG_PRINT fmt::print + +#define LOG_JOB_START(JOB_NAME) LOG_PRINT(stderr, "!!! Starting a job {} that will take at least {} on thread {}\n", \ + JOB_NAME, \ + repetitions * delay, std::this_thread::get_id()) + + auto partition_locker = [&blocker](size_t worker_id, size_t repetitions, std::chrono::microseconds delay) + { + LOG_JOB_START("partition_locker"); + + const std::string partition_id = "partition_" + std::to_string(worker_id); + for (size_t i = 0; i < repetitions; ++i) + { + auto lock = blocker.cancelForPartition(partition_id); + if (i != repetitions-1) + std::this_thread::sleep_for(delay); + } + }; + + auto global_locker = [&blocker](size_t repetitions, std::chrono::microseconds delay) + { + LOG_JOB_START("global_locker"); + + for (size_t i = 0; i < repetitions; ++i) + { + auto lock = blocker.cancel(); + if (i != repetitions-1) + std::this_thread::sleep_for(delay); + } + }; + + auto try_execute_on_partition = [&blocker](size_t worker_id, auto action, size_t repetitions, std::chrono::microseconds delay) + { + LOG_JOB_START("try_execute_on_partition"); + + const std::string partition_id = "partition_" + std::to_string(worker_id); + for (size_t i = 0; i < repetitions; ++i) + { + if (!blocker.isCancelledForPartition(partition_id)) + { + action(partition_id); + } + if (i != repetitions-1) + std::this_thread::sleep_for(delay); + } + }; + + std::atomic successfull_partition_actions = 0; + + auto print_partition_id = [&successfull_partition_actions](const std::string & /*partition_id*/) + { + ++successfull_partition_actions; + //LOG_PRINT(stderr, "Got UNLOCKED partition {} on {}\n", partition_id, std::this_thread::get_id()); + }; + + auto print_debug = [&blocker](size_t repetitions, std::chrono::microseconds delay, const char * prefix = "") + { + if (repetitions > 1) + LOG_JOB_START("print_debug"); + + for (size_t i = 0; i < repetitions; ++i) + { + LOG_PRINT(stderr, "!!! {} Debug {} \n {}\n", prefix, i, blocker.formatDebug()); + + if (i != repetitions-1) + std::this_thread::sleep_for(delay); + } + }; + + for (size_t w = 0; w < WORKERS_COUNT; ++w) + { + workers.emplace_back(partition_locker, w % 100, DEFAULT_ITERATIONS, DEFAULT_DELAY); + } + + for (size_t w = 0; w < WORKERS_COUNT * 2; ++w) + { + workers.emplace_back(try_execute_on_partition, w % 100, print_partition_id, DEFAULT_ITERATIONS, DEFAULT_DELAY); + } + + for (size_t w = 0; w < 10; ++w) + { + workers.emplace_back(global_locker, DEFAULT_ITERATIONS / 10, DEFAULT_DELAY * 10); + } + + workers.emplace_back(print_debug, 5, std::chrono::seconds(1), "SCHEDULED"); + + std::this_thread::sleep_for(std::chrono::seconds(6)); + + LOG_PRINT(stderr, "!!! Joining {} threads\n", workers.size()); + + size_t t = 0; + for (auto & w : workers) + { + LOG_PRINT(stderr, "\tJoining thread #{} {} ...", t++, w.get_id()); + w.join(); + LOG_PRINT(stderr, "\tjoinied OK\n"); + + } + LOG_PRINT(stderr, "!!! Joined all {} threads", workers.size()); + std::cerr << successfull_partition_actions << " times seen unlocked partition" << std::endl; + + print_debug(1, std::chrono::microseconds{0}, "POST-JOIN"); +} diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index e15b308f084..3dc0dcf0b41 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -1138,7 +1138,7 @@ bool StorageMergeTree::merge( { std::unique_lock lock(currently_processing_in_background_mutex); - if (merger_mutator.merges_blocker.isCancelled()) + if (merger_mutator.merges_blocker.isCancelledForPartition(partition_id)) throw Exception(ErrorCodes::ABORTED, "Cancelled merging parts"); merge_mutate_entry = selectPartsToMerge( @@ -1408,6 +1408,9 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign if (merge_entry) { + if (merger_mutator.merges_blocker.isCancelledForPartition(merge_entry->future_part->part_info.partition_id)) + return false; + auto task = std::make_shared(*this, metadata_snapshot, /* deduplicate */ false, Names{}, /* cleanup */ false, merge_entry, shared_lock, common_assignee_trigger); task->setCurrentTransaction(std::move(transaction_for_merge), std::move(txn)); bool scheduled = assignee.scheduleMergeMutateTask(task); @@ -1419,6 +1422,9 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign } if (mutate_entry) { + if (merger_mutator.merges_blocker.isCancelledForPartition(mutate_entry->future_part->part_info.partition_id)) + return false; + /// We take new metadata snapshot here. It's because mutation commands can be executed only with metadata snapshot /// which is equal or more fresh than commands themselves. In extremely rare case it can happen that we will have alter /// in between we took snapshot above and selected commands. That is why we take new snapshot here. @@ -1631,6 +1637,66 @@ bool StorageMergeTree::optimize( return true; } +namespace +{ + +size_t countOccurrences(const StorageMergeTree::DataParts & haystack, const DataPartsVector & needle) +{ + size_t total = 0; + for (const auto & n : needle) + total += haystack.count(n); + + return total; +} + +auto getNameWithState(const auto & parts) +{ + return std::views::transform(parts, [](const auto & p) + { + return p->getNameWithState(); + }); +} + +} + +// Same as stopMergesAndWait, but waits only for merges on parts belonging to a certain partition. +ActionLock StorageMergeTree::stopMergesAndWaitForPartition(String partition_id) +{ + LOG_DEBUG(log, "StorageMergeTree::stopMergesAndWaitForPartition partition_id: \"{}\"", partition_id); + /// Stop all merges and prevent new from starting, BUT unlike stopMergesAndWait(), only wait for the merges on small set of parts to finish. + + std::unique_lock lock(currently_processing_in_background_mutex); + + /// Asks to complete merges and does not allow them to start. + /// This protects against "revival" of data for a removed partition after completion of merge. + auto merge_blocker = merger_mutator.merges_blocker.cancelForPartition(partition_id); + + const DataPartsVector parts_to_wait = getDataPartsVectorInPartitionForInternalUsage(MergeTreeDataPartState::Active, partition_id); + LOG_DEBUG(log, "StorageMergeTree::stopMergesAndWaitForPartition parts to wait: {} ({} items)", + fmt::join(getNameWithState(parts_to_wait), ", "), parts_to_wait.size()); + + LOG_DEBUG(log, "StorageMergeTree::stopMergesAndWaitForPartition all mutating parts: {} ({} items)", + fmt::join(getNameWithState(currently_merging_mutating_parts), ", "), currently_merging_mutating_parts.size()); + + // TODO allow to stop merges in specific partition only (like it's done in ReplicatedMergeTree) + + while (size_t still_merging = countOccurrences(currently_merging_mutating_parts, parts_to_wait)) + { + LOG_DEBUG(log, "StorageMergeTree::stopMergesAndWaitForPartition Waiting for currently running merges ({} {} parts are merging right now)", + fmt::join(getNameWithState(currently_merging_mutating_parts), ", "), still_merging); + + if (std::cv_status::timeout == currently_processing_in_background_condition.wait_for( + lock, std::chrono::seconds(DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC))) + { + throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Timeout while waiting for already running merges"); + } + } + + LOG_DEBUG(log, "StorageMergeTree::stopMergesAndWaitForPartition done waiting, still merging {} ({} items)", + fmt::join(getNameWithState(currently_merging_mutating_parts), ", "), currently_merging_mutating_parts.size()); + return merge_blocker; +} + ActionLock StorageMergeTree::stopMergesAndWait() { /// TODO allow to stop merges in specific partition only (like it's done in ReplicatedMergeTree) @@ -2069,10 +2135,14 @@ PartitionCommandsResultInfo StorageMergeTree::attachPartition( void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, ContextPtr local_context) { assertNotReadonly(); + LOG_DEBUG(log, "StorageMergeTree::replacePartitionFrom\tsource_table: {}, replace: {}", source_table->getStorageID().getShortName(), replace); auto lock1 = lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout); auto lock2 = source_table->lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout); - auto merges_blocker = stopMergesAndWait(); + + const String partition_id = getPartitionIDFromQuery(partition, local_context); + auto merges_blocker = stopMergesAndWaitForPartition(partition_id); + auto source_metadata_snapshot = source_table->getInMemoryMetadataPtr(); auto my_metadata_snapshot = getInMemoryMetadataPtr(); @@ -2080,8 +2150,6 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con ProfileEventsScope profile_events_scope; MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, my_metadata_snapshot); - String partition_id = getPartitionIDFromQuery(partition, local_context); - DataPartsVector src_parts = src_data.getVisibleDataPartsVectorInPartition(local_context, partition_id); MutableDataPartsVector dst_parts; std::vector dst_parts_locks; diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index c384a391291..cd307cde666 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -185,6 +185,7 @@ private: /// If not force, then take merges selector and check that part is not participating in background operations. MergeTreeDataPartPtr outdatePart(MergeTreeTransaction * txn, const String & part_name, bool force, bool clear_without_timeout = true); ActionLock stopMergesAndWait(); + ActionLock stopMergesAndWaitForPartition(String partition_id); /// Allocate block number for new mutation, write mutation to disk /// and into in-memory structures. Wake up merge-mutation task. diff --git a/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_merges_on_unrelated_partitions.reference b/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_merges_on_unrelated_partitions.reference new file mode 100644 index 00000000000..6cf29d3233a --- /dev/null +++ b/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_merges_on_unrelated_partitions.reference @@ -0,0 +1,49 @@ +-- { echo } + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +CREATE TABLE t1 +( + `p` UInt8, + `i` UInt64 +) +ENGINE = MergeTree +PARTITION BY p +ORDER BY tuple(); +INSERT INTO t1 VALUES (1, 1), (2, 2); +SYSTEM STOP MERGES t1; +-- Creating new parts +insert into t1 (p, i) select 1, number from numbers(100); +ALTER TABLE t1 ADD COLUMN make_merge_slower UInt8 DEFAULT sleepEachRow(0.03); +CREATE TABLE t2 AS t1; +INSERT INTO t2 VALUES (2, 2000, 1); +-- expecting 2 parts for partition '1' +SELECT partition, count(partition) FROM system.parts WHERE database==currentDatabase() AND table=='t1' AND active=='1' GROUP BY partition ORDER BY partition; +1 2 +2 1 +SYSTEM START MERGES t1; +starting merge on t1 partiton 1 by OPTIMIZE DEDUPLICATEing in background process +Give server a moment to start merge +-- { echo } +-- assume merge started +SELECT is_mutation, partition_id FROM system.merges WHERE database==currentDatabase() AND table=='t1'; +0 1 +-- Should complete right away since there are no merges on partition t2 +ALTER TABLE t1 REPLACE PARTITION id '2' FROM t2; +SELECT * FROM t1 WHERE p=2; +2 2000 1 +-- expect merge is still running +SELECT is_mutation, partition_id FROM system.merges WHERE database==currentDatabase() AND table=='t1'; +0 1 +-- Expecting that merge hasn't finished yet (since ALTER TABLE .. REPLACE wasn't waiting for it), +-- and there are lots of unduplicated rows +SELECT count(*) FROM t1 WHERE p=1; +101 +Merging done +-- { echo } +-- check that merge is finished +SELECT is_mutation, partition_id FROM system.merges WHERE database==currentDatabase() AND table=='t1'; +-- Expecting that merge finished +SELECT * FROM t1 ORDER BY p; +1 1 0 +2 2000 1 diff --git a/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_merges_on_unrelated_partitions.sh b/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_merges_on_unrelated_partitions.sh new file mode 100755 index 00000000000..f8119d1c23f --- /dev/null +++ b/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_merges_on_unrelated_partitions.sh @@ -0,0 +1,85 @@ +#!/usr/bin/env bash + +# https://github.com/ClickHouse/ClickHouse/issues/45328 +# Check that replacing one partition on a table with `ALTER TABLE REPLACE PARTITION` +# doesn't wait for merges on other partitions. + +# Manually start merge (with `OPTIMIZE DEDUPLICATE`) on partition 1, +# and at the same time, do `REPLACE PARTITION` on partition 2. + +# This is a sh-test only to be able to start `OPTIMIZE DEDUPLICATE` in the background. + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT -nm <<'EOF' +-- { echo } + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; + +CREATE TABLE t1 +( + `p` UInt8, + `i` UInt64 +) +ENGINE = MergeTree +PARTITION BY p +ORDER BY tuple(); + +INSERT INTO t1 VALUES (1, 1), (2, 2); + +SYSTEM STOP MERGES t1; + +-- Creating new parts +insert into t1 (p, i) select 1, number from numbers(100); +ALTER TABLE t1 ADD COLUMN make_merge_slower UInt8 DEFAULT sleepEachRow(0.03); + +CREATE TABLE t2 AS t1; +INSERT INTO t2 VALUES (2, 2000, 1); + +-- expecting 2 parts for partition '1' +SELECT partition, count(partition) FROM system.parts WHERE database==currentDatabase() AND table=='t1' AND active=='1' GROUP BY partition ORDER BY partition; + +SYSTEM START MERGES t1; +EOF + +echo Starting merge on t1 partition '1' by \'OPTIMIZE DEDUPLICATE\'ing in a background process +# Optimize deduplicate does merge, which is supposed to take some time +# Since this is a synchronous operation, starting it in the background. +$CLICKHOUSE_CLIENT -nq "OPTIMIZE TABLE t1 PARTITION id '1' DEDUPLICATE BY p;" & +merge_pid=$! + +sleep 0.1 && echo "Give the server a moment to start merge" +$CLICKHOUSE_CLIENT -nm <<'EOF' +-- { echo } +-- assume merge started +SELECT is_mutation, partition_id FROM system.merges WHERE database==currentDatabase() AND table=='t1'; + + +-- Should complete right away since there are no merges on partition t2 +ALTER TABLE t1 REPLACE PARTITION id '2' FROM t2; +SELECT * FROM t1 WHERE p=2; + + +-- Expecting that merge is still running +SELECT is_mutation, partition_id FROM system.merges WHERE database==currentDatabase() AND table=='t1'; + +-- Expecting that merge hasn't finished yet (since ALTER TABLE .. REPLACE wasn't waiting for it), +-- and there are lots of unduplicated rows +SELECT count(*) FROM t1 WHERE p=1; +EOF + + +# TODO: remove before merging PR +wait ${merge_pid} && echo 'Merging done' + +$CLICKHOUSE_CLIENT -nm <<'EOF' +-- { echo } +-- check that merge is finished +SELECT is_mutation, partition_id FROM system.merges WHERE database==currentDatabase() AND table=='t1'; + +-- Expecting that merge finished +SELECT * FROM t1 ORDER BY p; +EOF diff --git a/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_mutations_on_unrelated_partitions.reference b/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_mutations_on_unrelated_partitions.reference new file mode 100644 index 00000000000..8e9610399e0 --- /dev/null +++ b/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_mutations_on_unrelated_partitions.reference @@ -0,0 +1,33 @@ +-- https://github.com/ClickHouse/ClickHouse/issues/45328 +-- Check that replacing one partition on a table with `ALTER TABLE REPLACE PARTITION` +-- doesn't wait for mutations on other partitions. +-- { echo } + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +CREATE TABLE t1 +( + `p` UInt8, + `i` UInt64 +) +ENGINE = MergeTree +PARTITION BY p +ORDER BY tuple(); +INSERT INTO t1 VALUES (1, 1), (2, 2); +CREATE TABLE t2 AS t1; +INSERT INTO t2 VALUES (2, 2000); +-- mutation that is supposed to be running in bg while REPLACE is performed. +-- sleep(1) is arbitrary here, we just need mutation to take long enough to be noticeable. +ALTER TABLE t1 UPDATE i = if(sleep(1), 0, 9000) IN PARTITION id '1' WHERE p == 1; +-- check that mutation is started +SELECT is_done, latest_fail_reason, parts_to_do FROM system.mutations WHERE database==currentDatabase() AND table=='t1'; +0 2 +ALTER TABLE t1 REPLACE PARTITION id '2' FROM t2; +-- check that mutation is still running +SELECT is_done, latest_fail_reason, parts_to_do FROM system.mutations WHERE database==currentDatabase() AND table=='t1'; +0 1 +-- Expecting that mutation hasn't finished yet (since ALTER TABLE .. REPLACE wasn't waiting for it), +-- so row with `p == 1` still has the old value of `i == 1`. +SELECT * FROM t1 ORDER BY p; +1 1 +2 2000 diff --git a/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_mutations_on_unrelated_partitions.sql b/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_mutations_on_unrelated_partitions.sql new file mode 100644 index 00000000000..bacd906f7d2 --- /dev/null +++ b/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_mutations_on_unrelated_partitions.sql @@ -0,0 +1,36 @@ +-- https://github.com/ClickHouse/ClickHouse/issues/45328 +-- Check that replacing one partition on a table with `ALTER TABLE REPLACE PARTITION` +-- doesn't wait for mutations on other partitions. +-- { echo } + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; + +CREATE TABLE t1 +( + `p` UInt8, + `i` UInt64 +) +ENGINE = MergeTree +PARTITION BY p +ORDER BY tuple(); + +INSERT INTO t1 VALUES (1, 1), (2, 2); + +CREATE TABLE t2 AS t1; +INSERT INTO t2 VALUES (2, 2000); + +-- mutation that is supposed to be running in bg while REPLACE is performed. +-- sleep(1) is arbitrary here, we just need mutation to take long enough to be noticeable. +ALTER TABLE t1 UPDATE i = if(sleep(1), 0, 9000) IN PARTITION id '1' WHERE p == 1; +-- check that mutation is started +SELECT is_done, latest_fail_reason, parts_to_do FROM system.mutations WHERE database==currentDatabase() AND table=='t1'; + +ALTER TABLE t1 REPLACE PARTITION id '2' FROM t2; + +-- check that mutation is still running +SELECT is_done, latest_fail_reason, parts_to_do FROM system.mutations WHERE database==currentDatabase() AND table=='t1'; + +-- Expecting that mutation hasn't finished yet (since ALTER TABLE .. REPLACE wasn't waiting for it), +-- so row with `p == 1` still has the old value of `i == 1`. +SELECT * FROM t1 ORDER BY p; From a0ade7efc0f4e7151ba0dce66b23d3e2848a3449 Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Wed, 13 Mar 2024 11:29:14 +0000 Subject: [PATCH 02/17] Fixed build issues --- src/Storages/MergeTree/MutateTask.cpp | 12 ++++++------ .../tests/gtest_partition_action_blocker.cpp | 10 ++++++---- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index cd363b820b7..6333703aa38 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -52,13 +52,13 @@ namespace ErrorCodes namespace MutationHelpers { -static bool checkOperationIsNotCanceled(ActionBlocker & merges_blocker, MergeListEntry * mutate_entry) -{ - if (merges_blocker.isCancelled() || (*mutate_entry)->is_cancelled) - throw Exception(ErrorCodes::ABORTED, "Cancelled mutating parts"); +// static bool checkOperationIsNotCanceled(ActionBlocker & merges_blocker, MergeListEntry * mutate_entry) +// { +// if (merges_blocker.isCancelled() || (*mutate_entry)->is_cancelled) +// throw Exception(ErrorCodes::ABORTED, "Cancelled mutating parts"); - return true; -} +// return true; +// } /** Split mutation commands into two parts: * First part should be executed by mutations interpreter. diff --git a/src/Storages/MergeTree/tests/gtest_partition_action_blocker.cpp b/src/Storages/MergeTree/tests/gtest_partition_action_blocker.cpp index e3e93a0ef80..4c6e0036250 100644 --- a/src/Storages/MergeTree/tests/gtest_partition_action_blocker.cpp +++ b/src/Storages/MergeTree/tests/gtest_partition_action_blocker.cpp @@ -6,6 +6,8 @@ #include +#include + using namespace DB; TEST(PartitionActionBlocker, TestDefaultConstructor) @@ -347,11 +349,11 @@ TEST(PartitionActionBlocker, DISABLED_TestMultiThreadLocks) } }; - std::atomic successfull_partition_actions = 0; + std::atomic successful_partition_actions = 0; - auto print_partition_id = [&successfull_partition_actions](const std::string & /*partition_id*/) + auto print_partition_id = [&successful_partition_actions](const std::string & /*partition_id*/) { - ++successfull_partition_actions; + ++successful_partition_actions; //LOG_PRINT(stderr, "Got UNLOCKED partition {} on {}\n", partition_id, std::this_thread::get_id()); }; @@ -399,7 +401,7 @@ TEST(PartitionActionBlocker, DISABLED_TestMultiThreadLocks) } LOG_PRINT(stderr, "!!! Joined all {} threads", workers.size()); - std::cerr << successfull_partition_actions << " times seen unlocked partition" << std::endl; + std::cerr << successful_partition_actions << " times seen unlocked partition" << std::endl; print_debug(1, std::chrono::microseconds{0}, "POST-JOIN"); } From 8ace74e4b2ad4791055db11bac300dcb8db0feb2 Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Fri, 29 Mar 2024 19:26:15 +0000 Subject: [PATCH 03/17] Fixed crash caused by dereferencing nullptr. --- src/Storages/MergeTree/MergeTask.cpp | 3 +-- src/Storages/MergeTree/MutateTask.cpp | 2 +- src/Storages/StorageMergeTree.cpp | 11 +++++++++-- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 719a2d97c9a..4ec14b518e1 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -130,7 +130,7 @@ static void addMissedColumnsToSerializationInfos( bool MergeTask::GlobalRuntimeContext::isCancelled() const { - return merges_blocker->isCancelledForPartition(future_part->part_info.partition_id) + return (future_part ? merges_blocker->isCancelledForPartition(future_part->part_info.partition_id) : merges_blocker->isCancelled()) || merge_list_element_ptr->is_cancelled.load(std::memory_order_relaxed); } @@ -407,7 +407,6 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() merge_list_element = global_ctx->merge_list_element_ptr, partition_id = global_ctx->future_part->part_info.partition_id]() -> bool { - // TODO(vnemkov): we might want capture global_ctx here and use it's `isCancelled` method, but I'm not sure about object lifetimes. return merges_blocker->isCancelledForPartition(partition_id) || (need_remove && ttl_merges_blocker->isCancelled()) || merge_list_element->is_cancelled.load(std::memory_order_relaxed); diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 6333703aa38..0feb0e21763 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1000,7 +1000,7 @@ struct MutationContext bool checkOperationIsNotCanceled() const { - if (merges_blocker->isCancelledForPartition(new_data_part->info.partition_id) + if (new_data_part ? merges_blocker->isCancelledForPartition(new_data_part->info.partition_id) : merges_blocker->isCancelled() || (*mutate_entry)->is_cancelled) { throw Exception(ErrorCodes::ABORTED, "Cancelled mutating parts"); diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 3dc0dcf0b41..83d982235ca 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -1406,9 +1406,16 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign has_mutations = !current_mutations_by_version.empty(); } + auto isCancelled = [&merges_blocker = merger_mutator.merges_blocker] (const MergeMutateSelectedEntryPtr & entry) { + if (entry->future_part) + return merges_blocker.isCancelledForPartition(entry->future_part->part_info.partition_id); + + return merges_blocker.isCancelled(); + }; + if (merge_entry) { - if (merger_mutator.merges_blocker.isCancelledForPartition(merge_entry->future_part->part_info.partition_id)) + if (isCancelled(merge_entry)) return false; auto task = std::make_shared(*this, metadata_snapshot, /* deduplicate */ false, Names{}, /* cleanup */ false, merge_entry, shared_lock, common_assignee_trigger); @@ -1422,7 +1429,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign } if (mutate_entry) { - if (merger_mutator.merges_blocker.isCancelledForPartition(mutate_entry->future_part->part_info.partition_id)) + if (isCancelled(mutate_entry)) return false; /// We take new metadata snapshot here. It's because mutation commands can be executed only with metadata snapshot From 345011385f5ee1d25467779dad23659109b92f0d Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Fri, 29 Mar 2024 21:19:36 +0000 Subject: [PATCH 04/17] Fixed style issue --- src/Storages/StorageMergeTree.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index c1b492909b9..d72afc33ac1 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -1405,7 +1405,8 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign has_mutations = !current_mutations_by_version.empty(); } - auto isCancelled = [&merges_blocker = merger_mutator.merges_blocker] (const MergeMutateSelectedEntryPtr & entry) { + auto isCancelled = [&merges_blocker = merger_mutator.merges_blocker](const MergeMutateSelectedEntryPtr & entry) + { if (entry->future_part) return merges_blocker.isCancelledForPartition(entry->future_part->part_info.partition_id); From e0673c50143b0bf4b830eead4fc36ff71d99f551 Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Mon, 1 Apr 2024 16:11:52 +0000 Subject: [PATCH 05/17] Updated .reference to match changes in the test --- ...ion_do_not_wait_merges_on_unrelated_partitions.reference | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_merges_on_unrelated_partitions.reference b/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_merges_on_unrelated_partitions.reference index 6cf29d3233a..178ea0a6562 100644 --- a/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_merges_on_unrelated_partitions.reference +++ b/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_merges_on_unrelated_partitions.reference @@ -22,8 +22,8 @@ SELECT partition, count(partition) FROM system.parts WHERE database==currentData 1 2 2 1 SYSTEM START MERGES t1; -starting merge on t1 partiton 1 by OPTIMIZE DEDUPLICATEing in background process -Give server a moment to start merge +Starting merge on t1 partition 1 by 'OPTIMIZE DEDUPLICATE'ing in a background process +Give the server a moment to start merge -- { echo } -- assume merge started SELECT is_mutation, partition_id FROM system.merges WHERE database==currentDatabase() AND table=='t1'; @@ -32,7 +32,7 @@ SELECT is_mutation, partition_id FROM system.merges WHERE database==currentDatab ALTER TABLE t1 REPLACE PARTITION id '2' FROM t2; SELECT * FROM t1 WHERE p=2; 2 2000 1 --- expect merge is still running +-- Expecting that merge is still running SELECT is_mutation, partition_id FROM system.merges WHERE database==currentDatabase() AND table=='t1'; 0 1 -- Expecting that merge hasn't finished yet (since ALTER TABLE .. REPLACE wasn't waiting for it), From 8a31162f470f4c2017f9b17229bce0a99d2e1309 Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Tue, 2 Apr 2024 11:38:45 +0000 Subject: [PATCH 06/17] Fixed binary_tidy build fixes readability-redundant-access-specifiers warning --- src/Storages/MergeTree/PartitionActionBlocker.h | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Storages/MergeTree/PartitionActionBlocker.h b/src/Storages/MergeTree/PartitionActionBlocker.h index 5dd4e983c7a..ebfb10fd732 100644 --- a/src/Storages/MergeTree/PartitionActionBlocker.h +++ b/src/Storages/MergeTree/PartitionActionBlocker.h @@ -56,7 +56,6 @@ private: void compactPartitionBlockersLocked(); bool isCancelledForPartitionOnlyLocked(const std::string & partition_id) const; -private: ActionBlocker global_blocker; mutable std::shared_mutex mutex; From 615b8265dc38c50c3772e8bc12150f2ad356997d Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Wed, 3 Apr 2024 13:29:23 +0000 Subject: [PATCH 07/17] Less flaky test --- ...on_do_not_wait_mutations_on_unrelated_partitions.reference | 4 ++-- ...artition_do_not_wait_mutations_on_unrelated_partitions.sql | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_mutations_on_unrelated_partitions.reference b/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_mutations_on_unrelated_partitions.reference index 8e9610399e0..269a51c7bcf 100644 --- a/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_mutations_on_unrelated_partitions.reference +++ b/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_mutations_on_unrelated_partitions.reference @@ -20,8 +20,8 @@ INSERT INTO t2 VALUES (2, 2000); -- sleep(1) is arbitrary here, we just need mutation to take long enough to be noticeable. ALTER TABLE t1 UPDATE i = if(sleep(1), 0, 9000) IN PARTITION id '1' WHERE p == 1; -- check that mutation is started -SELECT is_done, latest_fail_reason, parts_to_do FROM system.mutations WHERE database==currentDatabase() AND table=='t1'; -0 2 +SELECT not(is_done) as is_running FROM system.mutations WHERE database==currentDatabase() AND table=='t1'; +1 ALTER TABLE t1 REPLACE PARTITION id '2' FROM t2; -- check that mutation is still running SELECT is_done, latest_fail_reason, parts_to_do FROM system.mutations WHERE database==currentDatabase() AND table=='t1'; diff --git a/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_mutations_on_unrelated_partitions.sql b/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_mutations_on_unrelated_partitions.sql index bacd906f7d2..8adcc3a59f6 100644 --- a/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_mutations_on_unrelated_partitions.sql +++ b/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_mutations_on_unrelated_partitions.sql @@ -24,7 +24,7 @@ INSERT INTO t2 VALUES (2, 2000); -- sleep(1) is arbitrary here, we just need mutation to take long enough to be noticeable. ALTER TABLE t1 UPDATE i = if(sleep(1), 0, 9000) IN PARTITION id '1' WHERE p == 1; -- check that mutation is started -SELECT is_done, latest_fail_reason, parts_to_do FROM system.mutations WHERE database==currentDatabase() AND table=='t1'; +SELECT not(is_done) as is_running FROM system.mutations WHERE database==currentDatabase() AND table=='t1'; ALTER TABLE t1 REPLACE PARTITION id '2' FROM t2; From e80e29c1bc8a388168b5a0bfb36cd43451049874 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov Date: Fri, 21 Jun 2024 13:17:20 +0000 Subject: [PATCH 08/17] try make test not flaky --- ...ion_do_not_wait_merges_on_unrelated_partitions.sh | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_merges_on_unrelated_partitions.sh b/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_merges_on_unrelated_partitions.sh index f8119d1c23f..1a4267fa346 100755 --- a/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_merges_on_unrelated_partitions.sh +++ b/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_merges_on_unrelated_partitions.sh @@ -51,7 +51,17 @@ echo Starting merge on t1 partition '1' by \'OPTIMIZE DEDUPLICATE\'ing in a back $CLICKHOUSE_CLIENT -nq "OPTIMIZE TABLE t1 PARTITION id '1' DEDUPLICATE BY p;" & merge_pid=$! -sleep 0.1 && echo "Give the server a moment to start merge" +echo "Give the server a moment to start merge" + +for attempt in {1..50} +do + part_info=$($CLICKHOUSE_CLIENT -nq "SELECT is_mutation, partition_id FROM system.merges WHERE database==currentDatabase() AND table=='t1' FORMAT CSV") + if [[ "$part_info" == "0,\"1\"" ]]; then + break; + fi + sleep 0.1 +done + $CLICKHOUSE_CLIENT -nm <<'EOF' -- { echo } -- assume merge started From 6b8f80a6bc0350d2de2d442c32989d4ca78752c8 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov Date: Fri, 21 Jun 2024 14:57:55 +0000 Subject: [PATCH 09/17] fix shellcheck in test --- ...lace_partition_do_not_wait_merges_on_unrelated_partitions.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_merges_on_unrelated_partitions.sh b/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_merges_on_unrelated_partitions.sh index 1a4267fa346..034dcb989a6 100755 --- a/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_merges_on_unrelated_partitions.sh +++ b/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_merges_on_unrelated_partitions.sh @@ -53,7 +53,7 @@ merge_pid=$! echo "Give the server a moment to start merge" -for attempt in {1..50} +for _ in {1..50} do part_info=$($CLICKHOUSE_CLIENT -nq "SELECT is_mutation, partition_id FROM system.merges WHERE database==currentDatabase() AND table=='t1' FORMAT CSV") if [[ "$part_info" == "0,\"1\"" ]]; then From 3870072aa3114f039f09ac3770c6c2b5049a1a38 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov Date: Fri, 21 Jun 2024 16:09:09 +0000 Subject: [PATCH 10/17] remove unused function --- src/Storages/MergeTree/MutateTask.cpp | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 73431cd3c65..83838147efd 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -54,14 +54,6 @@ namespace ErrorCodes namespace MutationHelpers { -static bool checkOperationIsNotCanceled(ActionBlocker & merges_blocker, MergeListEntry * mutate_entry) -{ - if (merges_blocker.isCancelled() || (*mutate_entry)->is_cancelled) - throw Exception(ErrorCodes::ABORTED, "Cancelled mutating parts"); - - return true; -} - static bool haveMutationsOfDynamicColumns(const MergeTreeData::DataPartPtr & data_part, const MutationCommands & commands) { for (const auto & command : commands) From 4f1df2baefb94d13904935f5bb1c53be4e6cf5b8 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov Date: Fri, 21 Jun 2024 18:50:27 +0000 Subject: [PATCH 11/17] remove debug output from unittest --- .../tests/gtest_partition_action_blocker.cpp | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/src/Storages/MergeTree/tests/gtest_partition_action_blocker.cpp b/src/Storages/MergeTree/tests/gtest_partition_action_blocker.cpp index 4c6e0036250..477199ef25f 100644 --- a/src/Storages/MergeTree/tests/gtest_partition_action_blocker.cpp +++ b/src/Storages/MergeTree/tests/gtest_partition_action_blocker.cpp @@ -29,7 +29,7 @@ TEST(PartitionActionBlocker, TestCancelForever) TEST(PartitionActionBlocker, TestCancel) { - const std::string partition_id = "some partition id"; +// const std::string partition_id = "some partition id"; PartitionActionBlocker blocker; { @@ -257,15 +257,6 @@ TEST(PartitionActionBlocker, TestFormatDebug) EXPECT_NE("", blocker.formatDebug()); } -namespace std { -template -inline ostream & operator<<(ostream & ostr, const chrono::duration & d) { - - const auto seconds = d.count() * 1.0 * P::num / P::den; - return ostr << std::fixed << std::setprecision(1) << seconds << "s"; -} -} - template <> struct fmt::formatter : public fmt::formatter { From 77189321f638c821baf8fe30bcf7d8b46fe78299 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov Date: Mon, 24 Jun 2024 08:11:06 +0000 Subject: [PATCH 12/17] remove debug gtest --- .../tests/gtest_partition_action_blocker.cpp | 140 ------------------ 1 file changed, 140 deletions(-) diff --git a/src/Storages/MergeTree/tests/gtest_partition_action_blocker.cpp b/src/Storages/MergeTree/tests/gtest_partition_action_blocker.cpp index 477199ef25f..75217a77875 100644 --- a/src/Storages/MergeTree/tests/gtest_partition_action_blocker.cpp +++ b/src/Storages/MergeTree/tests/gtest_partition_action_blocker.cpp @@ -256,143 +256,3 @@ TEST(PartitionActionBlocker, TestFormatDebug) EXPECT_NE("", blocker.formatDebug()); } - -template <> -struct fmt::formatter : public fmt::formatter -{ - auto format(const std::thread::id & id, fmt::format_context& ctx) const - { - std::stringstream sstr; - sstr << id; - - return fmt::formatter::format(sstr.str(), ctx); - } -}; - -template -struct fmt::formatter> : public fmt::formatter -{ - auto format(const std::chrono::duration & duration, fmt::format_context& ctx) const - { - std::stringstream sstr; - sstr << duration; - - return fmt::formatter::format(sstr.str(), ctx); - } -}; - -// TODO: remove this before merging -TEST(PartitionActionBlocker, DISABLED_TestMultiThreadLocks) -{ - // Do not validate contents, just make sure that nothing crashes. - - const size_t WORKERS_COUNT = 100; - const size_t DEFAULT_ITERATIONS = 100'000; - const auto DEFAULT_DELAY = std::chrono::microseconds{100}; - - std::vector workers; - PartitionActionBlocker blocker; -//#define LOG_PRINT(...) (void)(0) -#define LOG_PRINT fmt::print - -#define LOG_JOB_START(JOB_NAME) LOG_PRINT(stderr, "!!! Starting a job {} that will take at least {} on thread {}\n", \ - JOB_NAME, \ - repetitions * delay, std::this_thread::get_id()) - - auto partition_locker = [&blocker](size_t worker_id, size_t repetitions, std::chrono::microseconds delay) - { - LOG_JOB_START("partition_locker"); - - const std::string partition_id = "partition_" + std::to_string(worker_id); - for (size_t i = 0; i < repetitions; ++i) - { - auto lock = blocker.cancelForPartition(partition_id); - if (i != repetitions-1) - std::this_thread::sleep_for(delay); - } - }; - - auto global_locker = [&blocker](size_t repetitions, std::chrono::microseconds delay) - { - LOG_JOB_START("global_locker"); - - for (size_t i = 0; i < repetitions; ++i) - { - auto lock = blocker.cancel(); - if (i != repetitions-1) - std::this_thread::sleep_for(delay); - } - }; - - auto try_execute_on_partition = [&blocker](size_t worker_id, auto action, size_t repetitions, std::chrono::microseconds delay) - { - LOG_JOB_START("try_execute_on_partition"); - - const std::string partition_id = "partition_" + std::to_string(worker_id); - for (size_t i = 0; i < repetitions; ++i) - { - if (!blocker.isCancelledForPartition(partition_id)) - { - action(partition_id); - } - if (i != repetitions-1) - std::this_thread::sleep_for(delay); - } - }; - - std::atomic successful_partition_actions = 0; - - auto print_partition_id = [&successful_partition_actions](const std::string & /*partition_id*/) - { - ++successful_partition_actions; - //LOG_PRINT(stderr, "Got UNLOCKED partition {} on {}\n", partition_id, std::this_thread::get_id()); - }; - - auto print_debug = [&blocker](size_t repetitions, std::chrono::microseconds delay, const char * prefix = "") - { - if (repetitions > 1) - LOG_JOB_START("print_debug"); - - for (size_t i = 0; i < repetitions; ++i) - { - LOG_PRINT(stderr, "!!! {} Debug {} \n {}\n", prefix, i, blocker.formatDebug()); - - if (i != repetitions-1) - std::this_thread::sleep_for(delay); - } - }; - - for (size_t w = 0; w < WORKERS_COUNT; ++w) - { - workers.emplace_back(partition_locker, w % 100, DEFAULT_ITERATIONS, DEFAULT_DELAY); - } - - for (size_t w = 0; w < WORKERS_COUNT * 2; ++w) - { - workers.emplace_back(try_execute_on_partition, w % 100, print_partition_id, DEFAULT_ITERATIONS, DEFAULT_DELAY); - } - - for (size_t w = 0; w < 10; ++w) - { - workers.emplace_back(global_locker, DEFAULT_ITERATIONS / 10, DEFAULT_DELAY * 10); - } - - workers.emplace_back(print_debug, 5, std::chrono::seconds(1), "SCHEDULED"); - - std::this_thread::sleep_for(std::chrono::seconds(6)); - - LOG_PRINT(stderr, "!!! Joining {} threads\n", workers.size()); - - size_t t = 0; - for (auto & w : workers) - { - LOG_PRINT(stderr, "\tJoining thread #{} {} ...", t++, w.get_id()); - w.join(); - LOG_PRINT(stderr, "\tjoinied OK\n"); - - } - LOG_PRINT(stderr, "!!! Joined all {} threads", workers.size()); - std::cerr << successful_partition_actions << " times seen unlocked partition" << std::endl; - - print_debug(1, std::chrono::microseconds{0}, "POST-JOIN"); -} From 7c4dd00662aba32b1514dc23fba34faab127a2ab Mon Sep 17 00:00:00 2001 From: Andrey Zvonov Date: Mon, 24 Jun 2024 13:19:44 +0000 Subject: [PATCH 13/17] remove unused variables --- src/Storages/MergeTree/tests/gtest_partition_action_blocker.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Storages/MergeTree/tests/gtest_partition_action_blocker.cpp b/src/Storages/MergeTree/tests/gtest_partition_action_blocker.cpp index 75217a77875..45a326e3059 100644 --- a/src/Storages/MergeTree/tests/gtest_partition_action_blocker.cpp +++ b/src/Storages/MergeTree/tests/gtest_partition_action_blocker.cpp @@ -29,7 +29,6 @@ TEST(PartitionActionBlocker, TestCancelForever) TEST(PartitionActionBlocker, TestCancel) { -// const std::string partition_id = "some partition id"; PartitionActionBlocker blocker; { From c04bbdf1917e20449dc03a27b268c6ce3de13ee6 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov Date: Mon, 24 Jun 2024 15:16:13 +0000 Subject: [PATCH 14/17] remove unused variables(2) --- src/Common/tests/gtest_action_blocker.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Common/tests/gtest_action_blocker.cpp b/src/Common/tests/gtest_action_blocker.cpp index 687f77da679..f077d5e73dd 100644 --- a/src/Common/tests/gtest_action_blocker.cpp +++ b/src/Common/tests/gtest_action_blocker.cpp @@ -31,7 +31,6 @@ TEST(ActionBlocker, TestCancelForever) TEST(ActionBlocker, TestCancel) { - const std::string partition_id = "some partition id"; ActionBlocker blocker; { From b323a212f639fcc8f9b63c42356c8f6859d2b201 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov Date: Fri, 5 Jul 2024 11:25:15 +0000 Subject: [PATCH 15/17] remove excessive test --- ...t_merges_on_unrelated_partitions.reference | 49 ---------- ...not_wait_merges_on_unrelated_partitions.sh | 95 ------------------- 2 files changed, 144 deletions(-) delete mode 100644 tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_merges_on_unrelated_partitions.reference delete mode 100755 tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_merges_on_unrelated_partitions.sh diff --git a/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_merges_on_unrelated_partitions.reference b/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_merges_on_unrelated_partitions.reference deleted file mode 100644 index 178ea0a6562..00000000000 --- a/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_merges_on_unrelated_partitions.reference +++ /dev/null @@ -1,49 +0,0 @@ --- { echo } - -DROP TABLE IF EXISTS t1; -DROP TABLE IF EXISTS t2; -CREATE TABLE t1 -( - `p` UInt8, - `i` UInt64 -) -ENGINE = MergeTree -PARTITION BY p -ORDER BY tuple(); -INSERT INTO t1 VALUES (1, 1), (2, 2); -SYSTEM STOP MERGES t1; --- Creating new parts -insert into t1 (p, i) select 1, number from numbers(100); -ALTER TABLE t1 ADD COLUMN make_merge_slower UInt8 DEFAULT sleepEachRow(0.03); -CREATE TABLE t2 AS t1; -INSERT INTO t2 VALUES (2, 2000, 1); --- expecting 2 parts for partition '1' -SELECT partition, count(partition) FROM system.parts WHERE database==currentDatabase() AND table=='t1' AND active=='1' GROUP BY partition ORDER BY partition; -1 2 -2 1 -SYSTEM START MERGES t1; -Starting merge on t1 partition 1 by 'OPTIMIZE DEDUPLICATE'ing in a background process -Give the server a moment to start merge --- { echo } --- assume merge started -SELECT is_mutation, partition_id FROM system.merges WHERE database==currentDatabase() AND table=='t1'; -0 1 --- Should complete right away since there are no merges on partition t2 -ALTER TABLE t1 REPLACE PARTITION id '2' FROM t2; -SELECT * FROM t1 WHERE p=2; -2 2000 1 --- Expecting that merge is still running -SELECT is_mutation, partition_id FROM system.merges WHERE database==currentDatabase() AND table=='t1'; -0 1 --- Expecting that merge hasn't finished yet (since ALTER TABLE .. REPLACE wasn't waiting for it), --- and there are lots of unduplicated rows -SELECT count(*) FROM t1 WHERE p=1; -101 -Merging done --- { echo } --- check that merge is finished -SELECT is_mutation, partition_id FROM system.merges WHERE database==currentDatabase() AND table=='t1'; --- Expecting that merge finished -SELECT * FROM t1 ORDER BY p; -1 1 0 -2 2000 1 diff --git a/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_merges_on_unrelated_partitions.sh b/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_merges_on_unrelated_partitions.sh deleted file mode 100755 index 034dcb989a6..00000000000 --- a/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_merges_on_unrelated_partitions.sh +++ /dev/null @@ -1,95 +0,0 @@ -#!/usr/bin/env bash - -# https://github.com/ClickHouse/ClickHouse/issues/45328 -# Check that replacing one partition on a table with `ALTER TABLE REPLACE PARTITION` -# doesn't wait for merges on other partitions. - -# Manually start merge (with `OPTIMIZE DEDUPLICATE`) on partition 1, -# and at the same time, do `REPLACE PARTITION` on partition 2. - -# This is a sh-test only to be able to start `OPTIMIZE DEDUPLICATE` in the background. - -CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -# shellcheck source=../shell_config.sh -. "$CURDIR"/../shell_config.sh - -$CLICKHOUSE_CLIENT -nm <<'EOF' --- { echo } - -DROP TABLE IF EXISTS t1; -DROP TABLE IF EXISTS t2; - -CREATE TABLE t1 -( - `p` UInt8, - `i` UInt64 -) -ENGINE = MergeTree -PARTITION BY p -ORDER BY tuple(); - -INSERT INTO t1 VALUES (1, 1), (2, 2); - -SYSTEM STOP MERGES t1; - --- Creating new parts -insert into t1 (p, i) select 1, number from numbers(100); -ALTER TABLE t1 ADD COLUMN make_merge_slower UInt8 DEFAULT sleepEachRow(0.03); - -CREATE TABLE t2 AS t1; -INSERT INTO t2 VALUES (2, 2000, 1); - --- expecting 2 parts for partition '1' -SELECT partition, count(partition) FROM system.parts WHERE database==currentDatabase() AND table=='t1' AND active=='1' GROUP BY partition ORDER BY partition; - -SYSTEM START MERGES t1; -EOF - -echo Starting merge on t1 partition '1' by \'OPTIMIZE DEDUPLICATE\'ing in a background process -# Optimize deduplicate does merge, which is supposed to take some time -# Since this is a synchronous operation, starting it in the background. -$CLICKHOUSE_CLIENT -nq "OPTIMIZE TABLE t1 PARTITION id '1' DEDUPLICATE BY p;" & -merge_pid=$! - -echo "Give the server a moment to start merge" - -for _ in {1..50} -do - part_info=$($CLICKHOUSE_CLIENT -nq "SELECT is_mutation, partition_id FROM system.merges WHERE database==currentDatabase() AND table=='t1' FORMAT CSV") - if [[ "$part_info" == "0,\"1\"" ]]; then - break; - fi - sleep 0.1 -done - -$CLICKHOUSE_CLIENT -nm <<'EOF' --- { echo } --- assume merge started -SELECT is_mutation, partition_id FROM system.merges WHERE database==currentDatabase() AND table=='t1'; - - --- Should complete right away since there are no merges on partition t2 -ALTER TABLE t1 REPLACE PARTITION id '2' FROM t2; -SELECT * FROM t1 WHERE p=2; - - --- Expecting that merge is still running -SELECT is_mutation, partition_id FROM system.merges WHERE database==currentDatabase() AND table=='t1'; - --- Expecting that merge hasn't finished yet (since ALTER TABLE .. REPLACE wasn't waiting for it), --- and there are lots of unduplicated rows -SELECT count(*) FROM t1 WHERE p=1; -EOF - - -# TODO: remove before merging PR -wait ${merge_pid} && echo 'Merging done' - -$CLICKHOUSE_CLIENT -nm <<'EOF' --- { echo } --- check that merge is finished -SELECT is_mutation, partition_id FROM system.merges WHERE database==currentDatabase() AND table=='t1'; - --- Expecting that merge finished -SELECT * FROM t1 ORDER BY p; -EOF From 742c382db9c4e8995a99154d202ad8749ba5d51b Mon Sep 17 00:00:00 2001 From: Andrey Zvonov Date: Sun, 8 Sep 2024 17:23:30 +0000 Subject: [PATCH 16/17] small improvements --- src/Common/ActionBlocker.h | 2 +- src/Storages/MergeTree/PartitionActionBlocker.cpp | 5 +++-- src/Storages/MergeTree/PartitionActionBlocker.h | 2 ++ src/Storages/StorageMergeTree.cpp | 4 ++-- ...rtition_do_not_wait_mutations_on_unrelated_partitions.sql | 5 ++++- 5 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/Common/ActionBlocker.h b/src/Common/ActionBlocker.h index 055d266c72e..cf98847b852 100644 --- a/src/Common/ActionBlocker.h +++ b/src/Common/ActionBlocker.h @@ -17,7 +17,7 @@ public: bool isCancelled() const { return *counter > 0; } - /// Temporarily blocks corresponding actions (while the returned object is alive) + /// Temporary blocks corresponding actions (while the returned object is alive) friend class ActionLock; ActionLock cancel() { return ActionLock(*this); } diff --git a/src/Storages/MergeTree/PartitionActionBlocker.cpp b/src/Storages/MergeTree/PartitionActionBlocker.cpp index 5922be8d3f7..98544565647 100644 --- a/src/Storages/MergeTree/PartitionActionBlocker.cpp +++ b/src/Storages/MergeTree/PartitionActionBlocker.cpp @@ -15,7 +15,8 @@ ActionLock PartitionActionBlocker::cancelForPartition(const std::string & partit ActionLock result = partition_blockers[partition_id].cancel(); // Cleanup stale `ActionBlocker` instances once in a while, to prevent unbound growth. - if (prev_size != partition_blockers.size() && ++cleanup_counter > 32) // 32 is arbitrary. + cleanup_counter++; + if (prev_size != partition_blockers.size() && cleanup_counter > 32) // 32 is arbitrary. compactPartitionBlockersLocked(); return result; @@ -70,7 +71,7 @@ std::string PartitionActionBlocker::formatDebug() const size_t i = 0; for (const auto & p : partition_blockers) { - out << "\n\t" << DB::quote << p.first << " : " << p.second.getCounter().load(); + out << "\n\t" << DB::double_quote << p.first << ": " << p.second.getCounter().load(); if (++i < partition_blockers.size()) out << ","; diff --git a/src/Storages/MergeTree/PartitionActionBlocker.h b/src/Storages/MergeTree/PartitionActionBlocker.h index ebfb10fd732..edabd829b64 100644 --- a/src/Storages/MergeTree/PartitionActionBlocker.h +++ b/src/Storages/MergeTree/PartitionActionBlocker.h @@ -48,6 +48,8 @@ public: const std::atomic & getCounter() const { return global_blocker.getCounter(); } size_t countPartitionBlockers() const; + + /// Cleanup partition blockers void compactPartitionBlockers(); std::string formatDebug() const; diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 41690a09fff..4e00d549b6f 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -1695,7 +1695,7 @@ auto getNameWithState(const auto & parts) // Same as stopMergesAndWait, but waits only for merges on parts belonging to a certain partition. ActionLock StorageMergeTree::stopMergesAndWaitForPartition(String partition_id) { - LOG_DEBUG(log, "StorageMergeTree::stopMergesAndWaitForPartition partition_id: \"{}\"", partition_id); + LOG_DEBUG(log, "StorageMergeTree::stopMergesAndWaitForPartition partition_id: `{}`", partition_id); /// Stop all merges and prevent new from starting, BUT unlike stopMergesAndWait(), only wait for the merges on small set of parts to finish. std::unique_lock lock(currently_processing_in_background_mutex); @@ -1705,7 +1705,7 @@ ActionLock StorageMergeTree::stopMergesAndWaitForPartition(String partition_id) auto merge_blocker = merger_mutator.merges_blocker.cancelForPartition(partition_id); const DataPartsVector parts_to_wait = getDataPartsVectorInPartitionForInternalUsage(MergeTreeDataPartState::Active, partition_id); - LOG_DEBUG(log, "StorageMergeTree::stopMergesAndWaitForPartition parts to wait: {} ({} items)", + LOG_TRACE(log, "StorageMergeTree::stopMergesAndWaitForPartition parts to wait: {} ({} items)", fmt::join(getNameWithState(parts_to_wait), ", "), parts_to_wait.size()); LOG_DEBUG(log, "StorageMergeTree::stopMergesAndWaitForPartition all mutating parts: {} ({} items)", diff --git a/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_mutations_on_unrelated_partitions.sql b/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_mutations_on_unrelated_partitions.sql index 8adcc3a59f6..0e8dd503f75 100644 --- a/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_mutations_on_unrelated_partitions.sql +++ b/tests/queries/0_stateless/02861_alter_replace_partition_do_not_wait_mutations_on_unrelated_partitions.sql @@ -20,7 +20,7 @@ INSERT INTO t1 VALUES (1, 1), (2, 2); CREATE TABLE t2 AS t1; INSERT INTO t2 VALUES (2, 2000); --- mutation that is supposed to be running in bg while REPLACE is performed. +-- mutation that is supposed to be running in background while REPLACE is performed. -- sleep(1) is arbitrary here, we just need mutation to take long enough to be noticeable. ALTER TABLE t1 UPDATE i = if(sleep(1), 0, 9000) IN PARTITION id '1' WHERE p == 1; -- check that mutation is started @@ -34,3 +34,6 @@ SELECT is_done, latest_fail_reason, parts_to_do FROM system.mutations WHERE data -- Expecting that mutation hasn't finished yet (since ALTER TABLE .. REPLACE wasn't waiting for it), -- so row with `p == 1` still has the old value of `i == 1`. SELECT * FROM t1 ORDER BY p; + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; From 1964ffe688433cfb754b46bca0e0ae888dff00a5 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov Date: Mon, 9 Sep 2024 14:24:24 +0000 Subject: [PATCH 17/17] fix build --- src/Storages/StorageMergeTree.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index d020fc2fb2a..9b519245d2e 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -2169,7 +2169,7 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, my_metadata_snapshot); DataPartsVector src_parts; - String partition_id; + bool is_all = partition->as()->all; if (is_all) {