mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-25 03:00:49 +00:00
Add test for parallel detach and fix some bugs.
This commit is contained in:
parent
4300cc523d
commit
0a5403185c
@ -152,7 +152,7 @@ void ReplicatedMergeTreeQueue::insertUnlocked(
|
|||||||
if (entry->type == LogEntry::ALTER_METADATA)
|
if (entry->type == LogEntry::ALTER_METADATA)
|
||||||
{
|
{
|
||||||
LOG_TRACE(log, "Adding alter metadata version " << entry->alter_version << " to the queue");
|
LOG_TRACE(log, "Adding alter metadata version " << entry->alter_version << " to the queue");
|
||||||
alter_chain.addMetadataAlter(entry->alter_version, state_lock);
|
alter_chain.addMetadataAlter(entry->alter_version, entry->have_mutation, state_lock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -231,7 +231,7 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
|
|||||||
if (entry->type == LogEntry::ALTER_METADATA)
|
if (entry->type == LogEntry::ALTER_METADATA)
|
||||||
{
|
{
|
||||||
LOG_TRACE(log, "Finishing metadata alter with version " << entry->alter_version);
|
LOG_TRACE(log, "Finishing metadata alter with version " << entry->alter_version);
|
||||||
alter_chain.finishMetadataAlter(entry->alter_version, entry->have_mutation, state_lock);
|
alter_chain.finishMetadataAlter(entry->alter_version, state_lock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -1357,12 +1357,12 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
|
|||||||
{
|
{
|
||||||
LOG_TRACE(log, "Marking mutation " << znode << " done because it is <= mutation_pointer (" << mutation_pointer << ")");
|
LOG_TRACE(log, "Marking mutation " << znode << " done because it is <= mutation_pointer (" << mutation_pointer << ")");
|
||||||
mutation.is_done = true;
|
mutation.is_done = true;
|
||||||
|
alter_chain.finishDataAlter(mutation.entry->alter_version, lock);
|
||||||
if (mutation.parts_to_do.size() != 0)
|
if (mutation.parts_to_do.size() != 0)
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Seems like we jumped over mutation " << znode << " when downloaded part with bigger mutation number."
|
LOG_INFO(log, "Seems like we jumped over mutation " << znode << " when downloaded part with bigger mutation number."
|
||||||
<< " It's OK, tasks for rest parts will be skipped, but probably a lot of mutations were executed concurrently on different replicas.");
|
<< " It's OK, tasks for rest parts will be skipped, but probably a lot of mutations were executed concurrently on different replicas.");
|
||||||
mutation.parts_to_do.clear();
|
mutation.parts_to_do.clear();
|
||||||
alter_chain.finishDataAlter(mutation.entry->alter_version, lock);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (mutation.parts_to_do.size() == 0)
|
else if (mutation.parts_to_do.size() == 0)
|
||||||
|
@ -32,8 +32,8 @@ private:
|
|||||||
};
|
};
|
||||||
|
|
||||||
std::map<int, AlterState> queue_state;
|
std::map<int, AlterState> queue_state;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
|
||||||
int getHeadAlterVersion(std::lock_guard<std::mutex> & /*state_lock*/) const
|
int getHeadAlterVersion(std::lock_guard<std::mutex> & /*state_lock*/) const
|
||||||
{
|
{
|
||||||
if (!queue_state.empty())
|
if (!queue_state.empty())
|
||||||
@ -44,25 +44,25 @@ public:
|
|||||||
void addMutationForAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/)
|
void addMutationForAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/)
|
||||||
{
|
{
|
||||||
if (!queue_state.count(alter_version))
|
if (!queue_state.count(alter_version))
|
||||||
queue_state.emplace(alter_version, AlterState{false, false});
|
queue_state.emplace(alter_version, AlterState{true, false});
|
||||||
else
|
else
|
||||||
queue_state[alter_version].data_finished = false;
|
queue_state[alter_version].data_finished = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
void addMetadataAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/)
|
void addMetadataAlter(int alter_version, bool have_mutation, std::lock_guard<std::mutex> & /*state_lock*/)
|
||||||
{
|
{
|
||||||
if (!queue_state.count(alter_version))
|
if (!queue_state.count(alter_version))
|
||||||
queue_state.emplace(alter_version, AlterState{false, true});
|
queue_state.emplace(alter_version, AlterState{false, !have_mutation});
|
||||||
else
|
else
|
||||||
queue_state[alter_version].metadata_finished = false;
|
queue_state[alter_version].metadata_finished = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
void finishMetadataAlter(int alter_version, bool have_mutation, std::unique_lock <std::mutex> & /*state_lock*/)
|
void finishMetadataAlter(int alter_version, std::unique_lock <std::mutex> & /*state_lock*/)
|
||||||
{
|
{
|
||||||
assert(!queue_state.empty());
|
assert(!queue_state.empty());
|
||||||
assert(queue_state.begin()->first == alter_version);
|
assert(queue_state.begin()->first == alter_version);
|
||||||
|
|
||||||
if (!have_mutation)
|
if (queue_state[alter_version].data_finished)
|
||||||
queue_state.erase(alter_version);
|
queue_state.erase(alter_version);
|
||||||
else
|
else
|
||||||
queue_state[alter_version].metadata_finished = true;
|
queue_state[alter_version].metadata_finished = true;
|
||||||
|
@ -0,0 +1,11 @@
|
|||||||
|
1725
|
||||||
|
1725
|
||||||
|
1725
|
||||||
|
Starting alters
|
||||||
|
Finishing alters
|
||||||
|
1
|
||||||
|
0
|
||||||
|
1
|
||||||
|
0
|
||||||
|
1
|
||||||
|
0
|
108
dbms/tests/queries/0_stateless/01079_parallel_alter_detach_table.sh
Executable file
108
dbms/tests/queries/0_stateless/01079_parallel_alter_detach_table.sh
Executable file
@ -0,0 +1,108 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
. $CURDIR/../shell_config.sh
|
||||||
|
|
||||||
|
REPLICAS=3
|
||||||
|
|
||||||
|
for i in `seq $REPLICAS`; do
|
||||||
|
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_alter_detach_$i"
|
||||||
|
done
|
||||||
|
|
||||||
|
for i in `seq $REPLICAS`; do
|
||||||
|
$CLICKHOUSE_CLIENT --query "CREATE TABLE concurrent_alter_detach_$i (key UInt64, value1 UInt8, value2 UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/concurrent_alter_mt', '$i') ORDER BY key SETTINGS max_replicated_mutations_in_queue=1000, number_of_free_entries_in_pool_to_execute_mutation=0,max_replicated_merges_in_queue=1000"
|
||||||
|
done
|
||||||
|
|
||||||
|
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_detach_1 SELECT number, number + 10, number from numbers(10)"
|
||||||
|
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_detach_1 SELECT number, number + 10, number from numbers(10, 40)"
|
||||||
|
|
||||||
|
for i in `seq $REPLICAS`; do
|
||||||
|
$CLICKHOUSE_CLIENT --query "SELECT SUM(value1) FROM concurrent_alter_detach_$i"
|
||||||
|
done
|
||||||
|
|
||||||
|
INITIAL_SUM=`$CLICKHOUSE_CLIENT --query "SELECT SUM(value1) FROM concurrent_alter_detach_1"`
|
||||||
|
|
||||||
|
# This alters mostly requires not only metadata change
|
||||||
|
# but also conversion of data. Also they are all compatible
|
||||||
|
# between each other, so can be executed concurrently.
|
||||||
|
function correct_alter_thread()
|
||||||
|
{
|
||||||
|
TYPES=(Float64 String UInt8 UInt32)
|
||||||
|
while true; do
|
||||||
|
REPLICA=$(($RANDOM % 3 + 1))
|
||||||
|
TYPE=${TYPES[$RANDOM % ${#TYPES[@]} ]}
|
||||||
|
$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_detach_$REPLICA MODIFY COLUMN value1 $TYPE SETTINGS replication_alter_partitions_sync=2"; # additionaly we don't wait anything for more heavy concurrency
|
||||||
|
sleep 0.$RANDOM
|
||||||
|
done
|
||||||
|
}
|
||||||
|
|
||||||
|
# This thread add some data to table. After we finish we can check, that
|
||||||
|
# all our data have same types.
|
||||||
|
# insert queries will fail sometime because of wrong types.
|
||||||
|
function insert_thread()
|
||||||
|
{
|
||||||
|
|
||||||
|
VALUES=(7.0 7 '7')
|
||||||
|
while true; do
|
||||||
|
REPLICA=$(($RANDOM % 3 + 1))
|
||||||
|
VALUE=${VALUES[$RANDOM % ${#VALUES[@]} ]}
|
||||||
|
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_detach_$REPLICA VALUES($RANDOM, $VALUE, $VALUE)"
|
||||||
|
sleep 0.$RANDOM
|
||||||
|
done
|
||||||
|
}
|
||||||
|
|
||||||
|
function detach_attach_thread()
|
||||||
|
{
|
||||||
|
while true; do
|
||||||
|
REPLICA=$(($RANDOM % 3 + 1))
|
||||||
|
$CLICKHOUSE_CLIENT --query "DETACH TABLE concurrent_alter_detach_$REPLICA"
|
||||||
|
sleep 0.$RANDOM
|
||||||
|
$CLICKHOUSE_CLIENT --query "ATTACH TABLE concurrent_alter_detach_$REPLICA"
|
||||||
|
done
|
||||||
|
}
|
||||||
|
|
||||||
|
echo "Starting alters"
|
||||||
|
export -f correct_alter_thread;
|
||||||
|
export -f insert_thread;
|
||||||
|
export -f detach_attach_thread;
|
||||||
|
|
||||||
|
TIMEOUT=30
|
||||||
|
|
||||||
|
# Sometimes we detach and attach tables
|
||||||
|
timeout $TIMEOUT bash -c detach_attach_thread 2> /dev/null &
|
||||||
|
timeout $TIMEOUT bash -c detach_attach_thread 2> /dev/null &
|
||||||
|
|
||||||
|
timeout $TIMEOUT bash -c correct_alter_thread 2> /dev/null &
|
||||||
|
timeout $TIMEOUT bash -c correct_alter_thread 2> /dev/null &
|
||||||
|
|
||||||
|
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
|
||||||
|
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
|
||||||
|
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
|
||||||
|
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
|
||||||
|
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
|
||||||
|
|
||||||
|
wait
|
||||||
|
|
||||||
|
echo "Finishing alters"
|
||||||
|
|
||||||
|
for i in `seq $REPLICAS`; do
|
||||||
|
$CLICKHOUSE_CLIENT --query "ATTACH TABLE concurrent_alter_detach_$i" 2> /dev/null
|
||||||
|
done
|
||||||
|
|
||||||
|
|
||||||
|
for i in `seq $REPLICAS`; do
|
||||||
|
$CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA concurrent_alter_detach_$i"
|
||||||
|
done
|
||||||
|
|
||||||
|
|
||||||
|
# This alter will finish all previous, but replica 1 maybe still not up-to-date
|
||||||
|
while [[ $($CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_detach_1 MODIFY COLUMN value1 String SETTINGS replication_alter_partitions_sync=2" 2>&1) ]]; do
|
||||||
|
sleep 1
|
||||||
|
done
|
||||||
|
|
||||||
|
for i in `seq $REPLICAS`; do
|
||||||
|
$CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA concurrent_alter_detach_$i"
|
||||||
|
$CLICKHOUSE_CLIENT --query "SELECT SUM(toUInt64(value1)) > $INITIAL_SUM FROM concurrent_alter_detach_$i"
|
||||||
|
$CLICKHOUSE_CLIENT --query "SELECT COUNT() FROM system.mutations WHERE is_done=0" # all mutations have to be done
|
||||||
|
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_alter_detach_$i"
|
||||||
|
done
|
Loading…
Reference in New Issue
Block a user