Fix mutations finalization task

This commit is contained in:
alesapin 2020-02-05 19:30:02 +03:00
parent 8c23840cc8
commit 1923930379
5 changed files with 213 additions and 23 deletions

View File

@ -55,6 +55,18 @@ public:
return remove(MergeTreePartInfo::fromPartName(part_name, format_version));
}
/// Remove part and all covered parts from active set
bool removePartAndCoveredParts(const String & part_name)
{
Strings parts_covered_by = getPartsCoveredBy(MergeTreePartInfo::fromPartName(part_name, format_version));
bool result = true;
result &= remove(part_name);
for (const auto & part : parts_covered_by)
result &= remove(part);
return result;
}
/// If not found, return an empty string.
String getContainingPart(const MergeTreePartInfo & part_info) const;
String getContainingPart(const String & name) const;
@ -66,6 +78,11 @@ public:
size_t size() const;
void clear()
{
part_info_to_name.clear();
}
MergeTreeDataFormatVersion getFormatVersion() const { return format_version; }
private:

View File

@ -130,7 +130,7 @@ void ReplicatedMergeTreeQueue::insertUnlocked(
for (const String & virtual_part_name : entry->getVirtualPartNames())
{
virtual_parts.add(virtual_part_name);
updateMutationsPartsToDo(virtual_part_name, /* add = */ true);
addPartToMutations(virtual_part_name);
}
/// Put 'DROP PARTITION' entries at the beginning of the queue not to make superfluous fetches of parts that will be eventually deleted
@ -200,12 +200,16 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
for (const String & virtual_part_name : entry->getVirtualPartNames())
{
Strings replaced_parts;
/// In most cases we will replace only current parts, but sometimes
/// we can even replace virtual parts. For example when we failed to
/// GET source part and dowloaded merged/mutated part instead.
current_parts.add(virtual_part_name, &replaced_parts);
virtual_parts.add(virtual_part_name, &replaced_parts);
/// Each part from `replaced_parts` should become Obsolete as a result of executing the entry.
/// So it is one less part to mutate for each mutation with block number greater than part_info.getDataVersion()
/// So it is one less part to mutate for each mutation with block number greater or equal than part_info.getDataVersion()
for (const String & replaced_part_name : replaced_parts)
updateMutationsPartsToDo(replaced_part_name, /* add = */ false);
removePartFromMutations(replaced_part_name);
}
String drop_range_part_name;
@ -226,13 +230,13 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
{
/// Because execution of the entry is unsuccessful, `virtual_part_name` will never appear
/// so we won't need to mutate it.
updateMutationsPartsToDo(virtual_part_name, /* add = */ false);
removePartFromMutations(virtual_part_name);
}
}
}
void ReplicatedMergeTreeQueue::updateMutationsPartsToDo(const String & part_name, bool add)
void ReplicatedMergeTreeQueue::removePartFromMutations(const String & part_name)
{
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
auto in_partition = mutations_by_partition.find(part_info.partition_id);
@ -241,15 +245,15 @@ void ReplicatedMergeTreeQueue::updateMutationsPartsToDo(const String & part_name
bool some_mutations_are_probably_done = false;
auto from_it = in_partition->second.upper_bound(part_info.getDataVersion());
auto from_it = in_partition->second.lower_bound(part_info.getDataVersion());
for (auto it = from_it; it != in_partition->second.end(); ++it)
{
MutationStatus & status = *it->second;
status.parts_to_do += (add ? +1 : -1);
if (status.parts_to_do <= 0)
status.parts_to_do.removePartAndCoveredParts(part_name);
if (status.parts_to_do.size() == 0)
some_mutations_are_probably_done = true;
if (!add && !status.latest_failed_part.empty() && part_info.contains(status.latest_failed_part_info))
if (!status.latest_failed_part.empty() && part_info.contains(status.latest_failed_part_info))
{
status.latest_failed_part.clear();
status.latest_failed_part_info = MergeTreePartInfo();
@ -262,6 +266,20 @@ void ReplicatedMergeTreeQueue::updateMutationsPartsToDo(const String & part_name
storage.mutations_finalizing_task->schedule();
}
void ReplicatedMergeTreeQueue::addPartToMutations(const String & part_name)
{
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
auto in_partition = mutations_by_partition.find(part_info.partition_id);
if (in_partition == mutations_by_partition.end())
return;
auto from_it = in_partition->second.upper_bound(part_info.getDataVersion());
for (auto it = from_it; it != in_partition->second.end(); ++it)
{
MutationStatus & status = *it->second;
status.parts_to_do.add(part_name);
}
}
void ReplicatedMergeTreeQueue::updateTimesInZooKeeper(
zkutil::ZooKeeperPtr zookeeper,
@ -629,7 +647,7 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
for (const ReplicatedMergeTreeMutationEntryPtr & entry : new_mutations)
{
auto & mutation = mutations_by_znode.emplace(entry->znode_name, MutationStatus(entry))
auto & mutation = mutations_by_znode.emplace(entry->znode_name, MutationStatus(entry, format_version))
.first->second;
for (const auto & pair : entry->block_numbers)
@ -640,7 +658,9 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
}
/// Initialize `mutation.parts_to_do`. First we need to mutate all parts in `current_parts`.
mutation.parts_to_do += getPartNamesToMutate(*entry, current_parts).size();
Strings current_parts_to_mutate = getPartNamesToMutate(*entry, current_parts);
for (const String & current_part_to_mutate : current_parts_to_mutate)
mutation.parts_to_do.add(current_part_to_mutate);
/// And next we would need to mutate all parts with getDataVersion() greater than
/// mutation block number that would appear as a result of executing the queue.
@ -651,11 +671,11 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
auto part_info = MergeTreePartInfo::fromPartName(produced_part_name, format_version);
auto it = entry->block_numbers.find(part_info.partition_id);
if (it != entry->block_numbers.end() && it->second > part_info.getDataVersion())
++mutation.parts_to_do;
mutation.parts_to_do.add(produced_part_name);
}
}
if (mutation.parts_to_do == 0)
if (mutation.parts_to_do.size() == 0)
some_mutations_are_probably_done = true;
}
}
@ -1277,8 +1297,14 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
{
LOG_TRACE(log, "Marking mutation " << znode << " done because it is <= mutation_pointer (" << mutation_pointer << ")");
mutation.is_done = true;
if (mutation.parts_to_do.size() != 0)
{
LOG_INFO(log, "Seems like we jumped over mutation " << znode << " when downloaded part with bigger mutation number."
<< " It's OK, tasks for rest parts will be skipped, but probably a lot of mutations were executed concurrently on different replicas.");
mutation.parts_to_do.clear();
}
}
else if (mutation.parts_to_do == 0)
else if (mutation.parts_to_do.size() == 0)
{
LOG_TRACE(log, "Will check if mutation " << mutation.entry->znode_name << " is done");
candidates.push_back(mutation.entry);
@ -1417,7 +1443,7 @@ std::vector<MergeTreeMutationStatus> ReplicatedMergeTreeQueue::getMutationsStatu
{
const MutationStatus & status = pair.second;
const ReplicatedMergeTreeMutationEntry & entry = *status.entry;
const Names parts_to_mutate = getPartNamesToMutate(entry, current_parts);
Names parts_to_mutate = status.parts_to_do.getParts();
for (const MutationCommand & command : entry.commands)
{

View File

@ -97,18 +97,21 @@ private:
struct MutationStatus
{
MutationStatus(const ReplicatedMergeTreeMutationEntryPtr & entry_)
MutationStatus(const ReplicatedMergeTreeMutationEntryPtr & entry_, MergeTreeDataFormatVersion format_version_)
: entry(entry_)
, parts_to_do(format_version_)
{
}
ReplicatedMergeTreeMutationEntryPtr entry;
/// A number of parts that should be mutated/merged or otherwise moved to Obsolete state for this mutation to complete.
Int64 parts_to_do = 0;
/// Parts we have to mutate to complete mutation. We use ActiveDataPartSet structure
/// to be able to manage covering and covered parts.
ActiveDataPartSet parts_to_do;
/// Note that is_done is not equivalent to parts_to_do == 0
/// (even if parts_to_do == 0 some relevant parts can still commit in the future).
/// Note that is_done is not equivalent to parts_to_do.size() == 0
/// (even if parts_to_do.size() == 0 some relevant parts can still commit in the future).
/// Also we can jump over mutation when we dowload mutated part from other replica.
bool is_done = false;
String latest_failed_part;
@ -191,9 +194,14 @@ private:
std::optional<time_t> & max_processed_insert_time_changed,
std::unique_lock<std::mutex> & state_lock);
/// If the new part appears (add == true) or becomes obsolete (add == false), update parts_to_do of all affected mutations.
/// Notifies storage.mutations_finalizing_task if some mutations are probably finished.
void updateMutationsPartsToDo(const String & part_name, bool add);
/// Add part for mutations with block_number > part.getDataVersion()
void addPartToMutations(const String & part_name);
/// Remove part from mutations which were assigned to mutate it
/// with block_number > part.getDataVersion()
/// and block_number == part.getDataVersion()
/// ^ (this may happen if we downloaded mutated part from other replica)
void removePartFromMutations(const String & part_name);
/// Update the insertion times in ZooKeeper.
void updateTimesInZooKeeper(zkutil::ZooKeeperPtr zookeeper,

View File

@ -0,0 +1,17 @@
1725
1725
1725
1725
1725
Starting alters
Finishing alters
1
0
1
0
1
0
1
0
1
0

View File

@ -0,0 +1,122 @@
#!/usr/bin/env bash
# This test checks mutations concurrent execution with concurrent inserts.
# There was a bug in mutations finalization, when mutation finishes not after all
# MUTATE_PART tasks execution, but after GET of already mutated part from other replica.
# To test it we stop replicated sends on some replicas to delay fetch of required parts for mutation.
# Since our replication queue executing tasks concurrently it may happen, that we dowload already mutated
# part before source part.
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
REPLICAS=5
for i in `seq $REPLICAS`; do
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_alter_mt_$i"
done
for i in `seq $REPLICAS`; do
$CLICKHOUSE_CLIENT --query "CREATE TABLE concurrent_alter_mt_$i (key UInt64, value1 UInt64, value2 String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/concurrent_alter_mt', '$i') ORDER BY key"
done
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_mt_1 SELECT number, number + 10, toString(number) from numbers(10)"
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_mt_1 SELECT number, number + 10, toString(number) from numbers(10, 40)"
for i in `seq $REPLICAS`; do
$CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA concurrent_alter_mt_$i"
done
for i in `seq $REPLICAS`; do
$CLICKHOUSE_CLIENT --query "SELECT SUM(value1) FROM concurrent_alter_mt_$i"
done
INITIAL_SUM=`$CLICKHOUSE_CLIENT --query "SELECT SUM(value1) FROM concurrent_alter_mt_1"`
# Run mutation on random replica
function correct_alter_thread()
{
TYPES=(Float64 String UInt8 UInt32)
while true; do
REPLICA=$(($RANDOM % 5 + 1))
TYPE=${TYPES[$RANDOM % ${#TYPES[@]} ]}
$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_mt_$REPLICA UPDATE value1 = value1 + 1 WHERE 1";
sleep 0.$RANDOM
done
}
# This thread add some data to table.
function insert_thread()
{
VALUES=(7 8 9)
while true; do
REPLICA=$(($RANDOM % 5 + 1))
VALUE=${VALUES[$RANDOM % ${#VALUES[@]} ]}
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_mt_$REPLICA VALUES($RANDOM, $VALUE, toString($VALUE))"
sleep 0.$RANDOM
done
}
# Stop sends for some times on random replica
function stop_sends_thread()
{
while true; do
REPLICA=$(($RANDOM % 5 + 1))
$CLICKHOUSE_CLIENT --query "SYSTEM STOP REPLICATED SENDS concurrent_alter_mt_$REPLICA"
sleep 0.$RANDOM
sleep 0.$RANDOM
sleep 0.$RANDOM
$CLICKHOUSE_CLIENT --query "SYSTEM START REPLICATED SENDS concurrent_alter_mt_$REPLICA"
sleep 0.$RANDOM
done
}
echo "Starting alters"
export -f correct_alter_thread;
export -f insert_thread;
export -f stop_sends_thread;
TIMEOUT=30
timeout $TIMEOUT bash -c stop_sends_thread 2> /dev/null &
timeout $TIMEOUT bash -c correct_alter_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
wait
echo "Finishing alters"
for i in `seq $REPLICAS`; do
$CLICKHOUSE_CLIENT --query "SYSTEM START REPLICATED SENDS concurrent_alter_mt_$i"
done
# This alter will wait for all previous mutations
$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_mt_1 UPDATE value1 = value1 + 1 WHERE 1 SETTINGS mutations_sync = 2"
# maybe it's redundant
for i in `seq $REPLICAS`; do
$CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA concurrent_alter_mt_$i"
done
for i in `seq $REPLICAS`; do
$CLICKHOUSE_CLIENT --query "SELECT SUM(toUInt64(value1)) > $INITIAL_SUM FROM concurrent_alter_mt_$i"
$CLICKHOUSE_CLIENT --query "SELECT COUNT() FROM system.mutations WHERE is_done=0" # all mutations have to be done
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_alter_mt_$i"
done