From ee084f0ec9936266f976286c4f759a19f0f3d10b Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 15 Aug 2020 08:21:02 +0300 Subject: [PATCH] Fix race condition in DETACH and background merge --- src/Storages/StorageMergeTree.cpp | 33 ++++++++++++++++--- src/Storages/StorageMergeTree.h | 2 ++ .../01060_shutdown_table_after_detach.sql | 2 +- .../01442_merge_detach_attach.reference | 0 .../0_stateless/01442_merge_detach_attach.sh | 21 ++++++++++++ .../01443_merge_truncate.reference | 0 .../0_stateless/01443_merge_truncate.sh | 21 ++++++++++++ 7 files changed, 74 insertions(+), 5 deletions(-) create mode 100644 tests/queries/0_stateless/01442_merge_detach_attach.reference create mode 100755 tests/queries/0_stateless/01442_merge_detach_attach.sh create mode 100644 tests/queries/0_stateless/01443_merge_truncate.reference create mode 100755 tests/queries/0_stateless/01443_merge_truncate.sh diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index afaf6099937..64d880e6266 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -40,6 +40,7 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; extern const int INCORRECT_DATA; extern const int CANNOT_ASSIGN_OPTIMIZE; + extern const int TIMEOUT_EXCEEDED; } namespace ActionLocks @@ -138,7 +139,6 @@ void StorageMergeTree::shutdown() mutation_wait_event.notify_all(); } - merger_mutator.merges_blocker.cancelForever(); parts_mover.moves_blocker.cancelForever(); @@ -148,7 +148,6 @@ void StorageMergeTree::shutdown() if (moving_task_handle) global_context.getBackgroundMovePool().removeTask(moving_task_handle); - try { /// We clear all old parts after stopping all background operations. @@ -220,7 +219,7 @@ void StorageMergeTree::truncate(const ASTPtr &, const StorageMetadataPtr &, cons { /// Asks to complete merges and does not allow them to start. /// This protects against "revival" of data for a removed partition after completion of merge. - auto merge_blocker = merger_mutator.merges_blocker.cancel(); + auto merge_blocker = stopMergesAndWait(); /// NOTE: It's assumed that this method is called under lockForAlter. @@ -1140,12 +1139,38 @@ Pipe StorageMergeTree::alterPartition( return {}; } + +ActionLock StorageMergeTree::stopMergesAndWait() +{ + /// Asks to complete merges and does not allow them to start. + /// This protects against "revival" of data for a removed partition after completion of merge. + auto merge_blocker = merger_mutator.merges_blocker.cancel(); + + { + std::unique_lock lock(currently_processing_in_background_mutex); + while (!currently_merging_mutating_parts.empty()) + { + LOG_DEBUG(log, "Waiting for currently running merges ({} parts are merging right now)", + currently_merging_mutating_parts.size()); + + if (std::cv_status::timeout == currently_processing_in_background_condition.wait_for( + lock, std::chrono::seconds(DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC))) + { + throw Exception("Timeout while waiting for already running merges", ErrorCodes::TIMEOUT_EXCEEDED); + } + } + } + + return merge_blocker; +} + + void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, const Context & context) { { /// Asks to complete merges and does not allow them to start. /// This protects against "revival" of data for a removed partition after completion of merge. - auto merge_blocker = merger_mutator.merges_blocker.cancel(); + auto merge_blocker = stopMergesAndWait(); auto metadata_snapshot = getInMemoryMetadataPtr(); String partition_id = getPartitionIDFromQuery(partition, context); diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index 66d6c64d705..5662f9e0088 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -128,6 +128,8 @@ private: */ bool merge(bool aggressive, const String & partition_id, bool final, bool deduplicate, String * out_disable_reason = nullptr); + ActionLock stopMergesAndWait(); + BackgroundProcessingPoolTaskResult movePartsTask(); /// Allocate block number for new mutation, write mutation to disk diff --git a/tests/queries/0_stateless/01060_shutdown_table_after_detach.sql b/tests/queries/0_stateless/01060_shutdown_table_after_detach.sql index 730263a2b12..1987fffaa58 100644 --- a/tests/queries/0_stateless/01060_shutdown_table_after_detach.sql +++ b/tests/queries/0_stateless/01060_shutdown_table_after_detach.sql @@ -3,7 +3,7 @@ CREATE TABLE test Engine = MergeTree ORDER BY number AS SELECT number, toString( SELECT count() FROM test; -ALTER TABLE test detach partition tuple(); +ALTER TABLE test DETACH PARTITION tuple(); SELECT count() FROM test; diff --git a/tests/queries/0_stateless/01442_merge_detach_attach.reference b/tests/queries/0_stateless/01442_merge_detach_attach.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/01442_merge_detach_attach.sh b/tests/queries/0_stateless/01442_merge_detach_attach.sh new file mode 100755 index 00000000000..4577d805da5 --- /dev/null +++ b/tests/queries/0_stateless/01442_merge_detach_attach.sh @@ -0,0 +1,21 @@ +#!/usr/bin/env bash + +set -e + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. "$CUR_DIR"/../shell_config.sh + +CLICKHOUSE_CLIENT=$(echo ${CLICKHOUSE_CLIENT} | sed 's/'"--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}"'/--send_logs_level=none/g') + +${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS t" +${CLICKHOUSE_CLIENT} --query="CREATE TABLE t (x Int8) ENGINE = MergeTree ORDER BY tuple()" + +for _ in {1..100}; do + ${CLICKHOUSE_CLIENT} --query="INSERT INTO t VALUES (0)" + ${CLICKHOUSE_CLIENT} --query="INSERT INTO t VALUES (0)" + ${CLICKHOUSE_CLIENT} --query="OPTIMIZE TABLE t FINAL" 2>/dev/null & + ${CLICKHOUSE_CLIENT} --query="ALTER TABLE t DETACH PARTITION tuple()" + ${CLICKHOUSE_CLIENT} --query="SELECT count() FROM t HAVING count() > 0" +done + +wait diff --git a/tests/queries/0_stateless/01443_merge_truncate.reference b/tests/queries/0_stateless/01443_merge_truncate.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/01443_merge_truncate.sh b/tests/queries/0_stateless/01443_merge_truncate.sh new file mode 100755 index 00000000000..5a0d1188ab6 --- /dev/null +++ b/tests/queries/0_stateless/01443_merge_truncate.sh @@ -0,0 +1,21 @@ +#!/usr/bin/env bash + +set -e + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. "$CUR_DIR"/../shell_config.sh + +CLICKHOUSE_CLIENT=$(echo ${CLICKHOUSE_CLIENT} | sed 's/'"--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}"'/--send_logs_level=none/g') + +${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS t" +${CLICKHOUSE_CLIENT} --query="CREATE TABLE t (x Int8) ENGINE = MergeTree ORDER BY tuple()" + +for _ in {1..100}; do + ${CLICKHOUSE_CLIENT} --query="INSERT INTO t VALUES (0)" + ${CLICKHOUSE_CLIENT} --query="INSERT INTO t VALUES (0)" + ${CLICKHOUSE_CLIENT} --query="OPTIMIZE TABLE t FINAL" 2>/dev/null & + ${CLICKHOUSE_CLIENT} --query="TRUNCATE TABLE t" + ${CLICKHOUSE_CLIENT} --query="SELECT count() FROM t HAVING count() > 0" +done + +wait