Fix race condition in DETACH and background merge

This commit is contained in:
Alexey Milovidov 2020-08-15 08:21:02 +03:00
parent e032593005
commit ee084f0ec9
7 changed files with 74 additions and 5 deletions

View File

@ -40,6 +40,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS; extern const int BAD_ARGUMENTS;
extern const int INCORRECT_DATA; extern const int INCORRECT_DATA;
extern const int CANNOT_ASSIGN_OPTIMIZE; extern const int CANNOT_ASSIGN_OPTIMIZE;
extern const int TIMEOUT_EXCEEDED;
} }
namespace ActionLocks namespace ActionLocks
@ -138,7 +139,6 @@ void StorageMergeTree::shutdown()
mutation_wait_event.notify_all(); mutation_wait_event.notify_all();
} }
merger_mutator.merges_blocker.cancelForever(); merger_mutator.merges_blocker.cancelForever();
parts_mover.moves_blocker.cancelForever(); parts_mover.moves_blocker.cancelForever();
@ -148,7 +148,6 @@ void StorageMergeTree::shutdown()
if (moving_task_handle) if (moving_task_handle)
global_context.getBackgroundMovePool().removeTask(moving_task_handle); global_context.getBackgroundMovePool().removeTask(moving_task_handle);
try try
{ {
/// We clear all old parts after stopping all background operations. /// We clear all old parts after stopping all background operations.
@ -220,7 +219,7 @@ void StorageMergeTree::truncate(const ASTPtr &, const StorageMetadataPtr &, cons
{ {
/// Asks to complete merges and does not allow them to start. /// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge. /// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger_mutator.merges_blocker.cancel(); auto merge_blocker = stopMergesAndWait();
/// NOTE: It's assumed that this method is called under lockForAlter. /// NOTE: It's assumed that this method is called under lockForAlter.
@ -1140,12 +1139,38 @@ Pipe StorageMergeTree::alterPartition(
return {}; return {};
} }
ActionLock StorageMergeTree::stopMergesAndWait()
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger_mutator.merges_blocker.cancel();
{
std::unique_lock lock(currently_processing_in_background_mutex);
while (!currently_merging_mutating_parts.empty())
{
LOG_DEBUG(log, "Waiting for currently running merges ({} parts are merging right now)",
currently_merging_mutating_parts.size());
if (std::cv_status::timeout == currently_processing_in_background_condition.wait_for(
lock, std::chrono::seconds(DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC)))
{
throw Exception("Timeout while waiting for already running merges", ErrorCodes::TIMEOUT_EXCEEDED);
}
}
}
return merge_blocker;
}
void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, const Context & context) void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, const Context & context)
{ {
{ {
/// Asks to complete merges and does not allow them to start. /// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge. /// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger_mutator.merges_blocker.cancel(); auto merge_blocker = stopMergesAndWait();
auto metadata_snapshot = getInMemoryMetadataPtr(); auto metadata_snapshot = getInMemoryMetadataPtr();
String partition_id = getPartitionIDFromQuery(partition, context); String partition_id = getPartitionIDFromQuery(partition, context);

View File

@ -128,6 +128,8 @@ private:
*/ */
bool merge(bool aggressive, const String & partition_id, bool final, bool deduplicate, String * out_disable_reason = nullptr); bool merge(bool aggressive, const String & partition_id, bool final, bool deduplicate, String * out_disable_reason = nullptr);
ActionLock stopMergesAndWait();
BackgroundProcessingPoolTaskResult movePartsTask(); BackgroundProcessingPoolTaskResult movePartsTask();
/// Allocate block number for new mutation, write mutation to disk /// Allocate block number for new mutation, write mutation to disk

View File

@ -3,7 +3,7 @@ CREATE TABLE test Engine = MergeTree ORDER BY number AS SELECT number, toString(
SELECT count() FROM test; SELECT count() FROM test;
ALTER TABLE test detach partition tuple(); ALTER TABLE test DETACH PARTITION tuple();
SELECT count() FROM test; SELECT count() FROM test;

View File

@ -0,0 +1,21 @@
#!/usr/bin/env bash
set -e
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CUR_DIR"/../shell_config.sh
CLICKHOUSE_CLIENT=$(echo ${CLICKHOUSE_CLIENT} | sed 's/'"--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}"'/--send_logs_level=none/g')
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS t"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE t (x Int8) ENGINE = MergeTree ORDER BY tuple()"
for _ in {1..100}; do
${CLICKHOUSE_CLIENT} --query="INSERT INTO t VALUES (0)"
${CLICKHOUSE_CLIENT} --query="INSERT INTO t VALUES (0)"
${CLICKHOUSE_CLIENT} --query="OPTIMIZE TABLE t FINAL" 2>/dev/null &
${CLICKHOUSE_CLIENT} --query="ALTER TABLE t DETACH PARTITION tuple()"
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM t HAVING count() > 0"
done
wait

View File

@ -0,0 +1,21 @@
#!/usr/bin/env bash
set -e
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CUR_DIR"/../shell_config.sh
CLICKHOUSE_CLIENT=$(echo ${CLICKHOUSE_CLIENT} | sed 's/'"--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}"'/--send_logs_level=none/g')
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS t"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE t (x Int8) ENGINE = MergeTree ORDER BY tuple()"
for _ in {1..100}; do
${CLICKHOUSE_CLIENT} --query="INSERT INTO t VALUES (0)"
${CLICKHOUSE_CLIENT} --query="INSERT INTO t VALUES (0)"
${CLICKHOUSE_CLIENT} --query="OPTIMIZE TABLE t FINAL" 2>/dev/null &
${CLICKHOUSE_CLIENT} --query="TRUNCATE TABLE t"
${CLICKHOUSE_CLIENT} --query="SELECT count() FROM t HAVING count() > 0"
done
wait