mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge pull request #17499 from ClickHouse/concurrent_mutation_and_random_kill
Fix kill mutation on concurrent alter queries
This commit is contained in:
commit
25f40db2fb
@ -655,6 +655,11 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
|
||||
{
|
||||
LOG_DEBUG(log, "Removing killed mutation {} from local state.", entry.znode_name);
|
||||
some_active_mutations_were_killed = true;
|
||||
if (entry.isAlterMutation())
|
||||
{
|
||||
LOG_DEBUG(log, "Removed alter {} because mutation {} were killed.", entry.alter_version, entry.znode_name);
|
||||
alter_sequence.finishDataAlter(entry.alter_version, state_lock);
|
||||
}
|
||||
}
|
||||
else
|
||||
LOG_DEBUG(log, "Removing obsolete mutation {} from local state.", entry.znode_name);
|
||||
|
@ -0,0 +1,2 @@
|
||||
CREATE TABLE default.concurrent_mutate_kill\n(\n `key` UInt64,\n `value` Int64\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/test_01593/concurrent_mutate_kill\', \'1\')\nPARTITION BY key % 100\nORDER BY key\nSETTINGS max_replicated_mutations_in_queue = 1000, number_of_free_entries_in_pool_to_execute_mutation = 0, max_replicated_merges_in_queue = 1000, index_granularity = 8192
|
||||
499999500000
|
52
tests/queries/0_stateless/01593_concurrent_alter_mutations_kill.sh
Executable file
52
tests/queries/0_stateless/01593_concurrent_alter_mutations_kill.sh
Executable file
@ -0,0 +1,52 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_mutate_kill"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "CREATE TABLE concurrent_mutate_kill (key UInt64, value String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test_01593/concurrent_mutate_kill', '1') ORDER BY key PARTITION BY key % 100 SETTINGS max_replicated_mutations_in_queue=1000, number_of_free_entries_in_pool_to_execute_mutation=0,max_replicated_merges_in_queue=1000"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_mutate_kill SELECT number, toString(number) FROM numbers(1000000)"
|
||||
|
||||
function alter_thread
|
||||
{
|
||||
while true; do
|
||||
TYPE=$($CLICKHOUSE_CLIENT --query "SELECT type FROM system.columns WHERE table='concurrent_mutate_kill' and database='${CLICKHOUSE_DATABASE}' and name='value'")
|
||||
if [ "$TYPE" == "String" ]; then
|
||||
$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_mutate_kill MODIFY COLUMN value UInt64 SETTINGS replication_alter_partitions_sync=2"
|
||||
else
|
||||
$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_mutate_kill MODIFY COLUMN value String SETTINGS replication_alter_partitions_sync=2"
|
||||
fi
|
||||
done
|
||||
}
|
||||
|
||||
function kill_mutation_thread
|
||||
{
|
||||
while true; do
|
||||
mutation_id=$($CLICKHOUSE_CLIENT --query "SELECT mutation_id FROM system.mutations WHERE is_done=0 and database='${CLICKHOUSE_DATABASE}' and table='concurrent_mutate_kill' LIMIT 1")
|
||||
if [ ! -z "$mutation_id" ]; then
|
||||
$CLICKHOUSE_CLIENT --query "KILL MUTATION WHERE mutation_id='$mutation_id'" 1> /dev/null
|
||||
sleep 1
|
||||
fi
|
||||
done
|
||||
}
|
||||
|
||||
|
||||
export -f alter_thread;
|
||||
export -f kill_mutation_thread;
|
||||
|
||||
TIMEOUT=30
|
||||
|
||||
timeout $TIMEOUT bash -c alter_thread 2> /dev/null &
|
||||
timeout $TIMEOUT bash -c kill_mutation_thread 2> /dev/null &
|
||||
|
||||
wait
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA concurrent_mutate_kill"
|
||||
$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_mutate_kill MODIFY COLUMN value Int64 SETTINGS replication_alter_partitions_sync=2"
|
||||
$CLICKHOUSE_CLIENT --query "SHOW CREATE TABLE concurrent_mutate_kill"
|
||||
$CLICKHOUSE_CLIENT --query "OPTIMIZE TABLE concurrent_mutate_kill FINAL"
|
||||
$CLICKHOUSE_CLIENT --query "SELECT sum(value) FROM concurrent_mutate_kill"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_mutate_kill"
|
@ -0,0 +1,16 @@
|
||||
499999500000
|
||||
499999500000
|
||||
499999500000
|
||||
499999500000
|
||||
499999500000
|
||||
Metadata version on replica 1 equal with first replica, OK
|
||||
CREATE TABLE default.concurrent_kill_1\n(\n `key` UInt64,\n `value` Int64\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/test_01593_concurrent_kill\', \'1\')\nORDER BY key\nSETTINGS max_replicated_mutations_in_queue = 1000, number_of_free_entries_in_pool_to_execute_mutation = 0, max_replicated_merges_in_queue = 1000, index_granularity = 8192
|
||||
Metadata version on replica 2 equal with first replica, OK
|
||||
CREATE TABLE default.concurrent_kill_2\n(\n `key` UInt64,\n `value` Int64\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/test_01593_concurrent_kill\', \'2\')\nORDER BY key\nSETTINGS max_replicated_mutations_in_queue = 1000, number_of_free_entries_in_pool_to_execute_mutation = 0, max_replicated_merges_in_queue = 1000, index_granularity = 8192
|
||||
Metadata version on replica 3 equal with first replica, OK
|
||||
CREATE TABLE default.concurrent_kill_3\n(\n `key` UInt64,\n `value` Int64\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/test_01593_concurrent_kill\', \'3\')\nORDER BY key\nSETTINGS max_replicated_mutations_in_queue = 1000, number_of_free_entries_in_pool_to_execute_mutation = 0, max_replicated_merges_in_queue = 1000, index_granularity = 8192
|
||||
Metadata version on replica 4 equal with first replica, OK
|
||||
CREATE TABLE default.concurrent_kill_4\n(\n `key` UInt64,\n `value` Int64\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/test_01593_concurrent_kill\', \'4\')\nORDER BY key\nSETTINGS max_replicated_mutations_in_queue = 1000, number_of_free_entries_in_pool_to_execute_mutation = 0, max_replicated_merges_in_queue = 1000, index_granularity = 8192
|
||||
Metadata version on replica 5 equal with first replica, OK
|
||||
CREATE TABLE default.concurrent_kill_5\n(\n `key` UInt64,\n `value` Int64\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/test_01593_concurrent_kill\', \'5\')\nORDER BY key\nSETTINGS max_replicated_mutations_in_queue = 1000, number_of_free_entries_in_pool_to_execute_mutation = 0, max_replicated_merges_in_queue = 1000, index_granularity = 8192
|
||||
499999500000
|
@ -0,0 +1,79 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
REPLICAS=5
|
||||
|
||||
for i in $(seq $REPLICAS); do
|
||||
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_kill_$i"
|
||||
done
|
||||
|
||||
for i in $(seq $REPLICAS); do
|
||||
$CLICKHOUSE_CLIENT --query "CREATE TABLE concurrent_kill_$i (key UInt64, value String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test_01593_concurrent_kill', '$i') ORDER BY key SETTINGS max_replicated_mutations_in_queue=1000, number_of_free_entries_in_pool_to_execute_mutation=0,max_replicated_merges_in_queue=1000"
|
||||
done
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_kill_1 SELECT number, toString(number) FROM numbers(1000000)"
|
||||
|
||||
for i in $(seq $REPLICAS); do
|
||||
$CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA concurrent_kill_$i"
|
||||
$CLICKHOUSE_CLIENT --query "SELECT sum(toUInt64(value)) FROM concurrent_kill_$i"
|
||||
done
|
||||
|
||||
function alter_thread
|
||||
{
|
||||
while true; do
|
||||
REPLICA=$(($RANDOM % 5 + 1))
|
||||
TYPE=$($CLICKHOUSE_CLIENT --query "SELECT type FROM system.columns WHERE table='concurrent_kill_$REPLICA' and database='${CLICKHOUSE_DATABASE}' and name='value'")
|
||||
if [ "$TYPE" == "String" ]; then
|
||||
$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_kill_$REPLICA MODIFY COLUMN value UInt64 SETTINGS replication_alter_partitions_sync=2"
|
||||
else
|
||||
$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_kill_$REPLICA MODIFY COLUMN value String SETTINGS replication_alter_partitions_sync=2"
|
||||
fi
|
||||
done
|
||||
}
|
||||
|
||||
function kill_mutation_thread
|
||||
{
|
||||
while true; do
|
||||
mutation_id=$($CLICKHOUSE_CLIENT --query "SELECT mutation_id FROM system.mutations WHERE is_done = 0 and table like 'concurrent_kill_%' and database='${CLICKHOUSE_DATABASE}' LIMIT 1")
|
||||
if [ ! -z "$mutation_id" ]; then
|
||||
$CLICKHOUSE_CLIENT --query "KILL MUTATION WHERE mutation_id='$mutation_id'" 1> /dev/null
|
||||
sleep 1
|
||||
fi
|
||||
done
|
||||
}
|
||||
|
||||
export -f alter_thread;
|
||||
export -f kill_mutation_thread;
|
||||
|
||||
TIMEOUT=30
|
||||
|
||||
timeout $TIMEOUT bash -c alter_thread 2> /dev/null &
|
||||
timeout $TIMEOUT bash -c kill_mutation_thread 2> /dev/null &
|
||||
|
||||
wait
|
||||
|
||||
for i in $(seq $REPLICAS); do
|
||||
$CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA concurrent_kill_$i"
|
||||
done
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_kill_$i MODIFY COLUMN value Int64 SETTINGS replication_alter_partitions_sync=2"
|
||||
|
||||
metadata_version=$($CLICKHOUSE_CLIENT --query "SELECT value FROM system.zookeeper WHERE path = '/clickhouse/tables/test_01593_concurrent_kill/replicas/$i/' and name = 'metadata_version'")
|
||||
for i in $(seq $REPLICAS); do
|
||||
replica_metadata_version=$($CLICKHOUSE_CLIENT --query "SELECT value FROM system.zookeeper WHERE path = '/clickhouse/tables/test_01593_concurrent_kill/replicas/$i/' and name = 'metadata_version'")
|
||||
if [ "$metadata_version" != "$replica_metadata_version" ]; then
|
||||
echo "Metadata version on replica $i differs from the first replica, FAIL"
|
||||
else
|
||||
echo "Metadata version on replica $i equal with first replica, OK"
|
||||
fi
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "SHOW CREATE TABLE concurrent_kill_$i"
|
||||
done
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "SELECT sum(value) FROM concurrent_kill_1"
|
||||
|
||||
for i in $(seq $REPLICAS); do
|
||||
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_kill_$i"
|
||||
done
|
Loading…
Reference in New Issue
Block a user