Fix tests and work with multiple mutations

This commit is contained in:
alesapin 2020-07-22 18:19:54 +03:00
parent 7585fc4b52
commit 8c8bdd5070
12 changed files with 104 additions and 24 deletions

View File

@ -0,0 +1,33 @@
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
#include <Common/Exception.h>
#include <boost/algorithm/string/join.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int UNFINISHED;
}
void checkMutationStatus(std::optional<MergeTreeMutationStatus> & status, const Strings & mutation_ids)
{
if (!status)
{
assert(mutation_ids.size() == 1);
throw Exception(ErrorCodes::UNFINISHED, "Mutation {} was killed", mutation_ids[0]);
}
else if (!status->is_done && !status->latest_fail_reason.empty())
{
throw Exception(
ErrorCodes::UNFINISHED,
"Exception happened during execution of mutation{} '{}' with part '{}' reason: '{}'. This error maybe retryable or not. "
"In case of unretryable error, mutation can be killed with KILL MUTATION query",
mutation_ids.size() > 1 ? "s" : "",
boost::algorithm::join(mutation_ids, ", "),
status->latest_failed_part,
status->latest_fail_reason);
}
}
}

View File

@ -30,7 +30,8 @@ struct MergeTreeMutationStatus
/// Check mutation status and throw exception in case of error during mutation
/// (latest_fail_reason not empty) or if mutation was killed (status empty
/// optional). mutation_id passed separately, because status may be empty.
void checkMutationStatus(std::optional<MergeTreeMutationStatus> & status, const String & mutation_id);
/// optional). mutation_ids passed separately, because status may be empty and
/// we can execute multiple mutations at once
void checkMutationStatus(std::optional<MergeTreeMutationStatus> & status, const Strings & mutation_ids);
}

View File

@ -1538,16 +1538,16 @@ void ReplicatedMergeTreeQueue::getInsertTimes(time_t & out_min_unprocessed_inser
}
std::optional<MergeTreeMutationStatus> ReplicatedMergeTreeQueue::getIncompleteMutationStatus(const String & znode_name) const
std::optional<MergeTreeMutationStatus> ReplicatedMergeTreeQueue::getIncompleteMutationsStatus(const String & znode_name, Strings * mutation_ids) const
{
std::lock_guard lock(state_mutex);
auto it = mutations_by_znode.find(znode_name);
auto current_mutation_it = mutations_by_znode.find(znode_name);
/// killed
if (it == mutations_by_znode.end())
if (current_mutation_it == mutations_by_znode.end())
return {};
const MutationStatus & status = it->second;
const MutationStatus & status = current_mutation_it->second;
MergeTreeMutationStatus result
{
.is_done = status.is_done,
@ -1555,6 +1555,23 @@ std::optional<MergeTreeMutationStatus> ReplicatedMergeTreeQueue::getIncompleteMu
.latest_fail_time = status.latest_fail_time,
.latest_fail_reason = status.latest_fail_reason,
};
if (mutation_ids && !status.latest_fail_reason.empty())
{
const auto & latest_failed_part_info = status.latest_failed_part_info;
auto in_partition = mutations_by_partition.find(latest_failed_part_info.partition_id);
if (in_partition != mutations_by_partition.end())
{
const auto & version_to_status = in_partition->second;
auto begin_it = version_to_status.upper_bound(latest_failed_part_info.getDataVersion());
for (auto it = begin_it; it != version_to_status.end(); ++it)
{
/// All mutations with the same failure
if (!it->second->is_done && it->second->latest_fail_reason == status.latest_fail_reason)
mutation_ids->push_back(it->second->entry->znode_name);
}
}
}
return result;
}

View File

@ -402,8 +402,9 @@ public:
/// Return empty optional if mutation was killed. Otherwise return partially
/// filled mutation status with information about error (latest_fail*) and
/// is_done.
std::optional<MergeTreeMutationStatus> getIncompleteMutationStatus(const String & znode_name) const;
/// is_done. mutation_ids filled with all mutations with same errors, because
/// they may be executed simultaneously as one mutation.
std::optional<MergeTreeMutationStatus> getIncompleteMutationsStatus(const String & znode_name, Strings * mutation_ids = nullptr) const;
std::vector<MergeTreeMutationStatus> getMutationsStatus() const;

View File

@ -410,7 +410,7 @@ void StorageMergeTree::waitForMutation(Int64 version, const String & file_name)
{
if (shutdown_called)
return true;
auto mutation_status = getIncompleteMutationStatus(version);
auto mutation_status = getIncompleteMutationsStatus(version);
return !mutation_status || mutation_status->is_done || !mutation_status->latest_fail_reason.empty();
};
@ -418,8 +418,9 @@ void StorageMergeTree::waitForMutation(Int64 version, const String & file_name)
mutation_wait_event.wait(lock, check);
}
auto mutation_status = getIncompleteMutationStatus(version);
checkMutationStatus(mutation_status, file_name);
Strings mutation_ids;
auto mutation_status = getIncompleteMutationsStatus(version, &mutation_ids);
checkMutationStatus(mutation_status, mutation_ids);
LOG_INFO(log, "Mutation {} done", file_name);
}
@ -449,29 +450,42 @@ bool comparator(const PartVersionWithName & f, const PartVersionWithName & s)
}
std::optional<MergeTreeMutationStatus> StorageMergeTree::getIncompleteMutationStatus(Int64 mutation_version) const
std::optional<MergeTreeMutationStatus> StorageMergeTree::getIncompleteMutationsStatus(Int64 mutation_version, Strings * mutation_ids) const
{
std::lock_guard lock(currently_processing_in_background_mutex);
auto it = current_mutations_by_version.find(mutation_version);
auto current_mutation_it = current_mutations_by_version.find(mutation_version);
/// Killed
if (it == current_mutations_by_version.end())
if (current_mutation_it == current_mutations_by_version.end())
return {};
MergeTreeMutationStatus result{.is_done = false};
const auto & mutation_entry = it->second;
const auto & mutation_entry = current_mutation_it->second;
auto data_parts = getDataPartsVector();
for (const auto & data_part : data_parts)
{
if (data_part->info.getDataVersion() < mutation_version)
{
if (!mutation_entry.latest_fail_reason.empty())
{
result.latest_failed_part = mutation_entry.latest_failed_part;
result.latest_fail_reason = mutation_entry.latest_fail_reason;
result.latest_fail_time = mutation_entry.latest_fail_time;
/// Fill all mutations which failed with the same error
/// (we can execute several mutations together)
if (mutation_ids)
{
auto mutations_begin_it = current_mutations_by_version.upper_bound(data_part->info.getDataVersion());
for (auto it = mutations_begin_it; it != current_mutations_by_version.end(); ++it)
/// All mutations with the same failure
if (it->second.latest_fail_reason == result.latest_fail_reason)
mutation_ids->push_back(it->second.file_name);
}
}
return result;

View File

@ -156,8 +156,9 @@ private:
/// Return empty optional if mutation was killed. Otherwise return partially
/// filled mutation status with information about error (latest_fail*) and
/// is_done.
std::optional<MergeTreeMutationStatus> getIncompleteMutationStatus(Int64 mutation_version) const;
/// is_done. mutation_ids filled with mutations with the same errors, because we
/// can execute several mutations at once
std::optional<MergeTreeMutationStatus> getIncompleteMutationsStatus(Int64 mutation_version, Strings * mutation_ids = nullptr) const;
void startBackgroundMovesIfNeeded() override;

View File

@ -386,7 +386,7 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
if (wait_event->tryWait(1000))
break;
auto mutation_status = queue.getIncompleteMutationStatus(mutation_id);
auto mutation_status = queue.getIncompleteMutationsStatus(mutation_id);
if (!mutation_status || !mutation_status->latest_fail_reason.empty())
break;
}
@ -398,8 +398,9 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
throw Exception(ErrorCodes::UNFINISHED, "Mutation {} was killed, manually removed or table was dropped", mutation_id);
}
auto mutation_status = queue.getIncompleteMutationStatus(mutation_id);
checkMutationStatus(mutation_status, mutation_id);
Strings mutation_ids;
auto mutation_status = queue.getIncompleteMutationsStatus(mutation_id, &mutation_ids);
checkMutationStatus(mutation_status, mutation_ids);
if (partial_shutdown_called)
throw Exception("Mutation is not finished because table shutdown was called. It will be done after table restart.",

View File

@ -87,6 +87,7 @@ SRCS(
MergeTree/MergeTreeDataPartWriterOnDisk.cpp
MergeTree/MergeTreeReaderInMemory.cpp
MergeTree/MergeTreeWriteAheadLog.cpp
MergeTree/MergeTreeMutationStatus.cpp
System/attachSystemTables.cpp
System/StorageSystemAggregateFunctionCombinators.cpp
System/StorageSystemAsynchronousMetrics.cpp

View File

@ -2,6 +2,7 @@
1
waiting test kill_mutation mutation_3.txt DELETE WHERE toUInt32(s) = 1
*** Create and kill invalid mutation that blocks another mutation ***
happened during execution of mutations 'mutation_4.txt, mutation_5.txt'
1
waiting test kill_mutation mutation_4.txt DELETE WHERE toUInt32(s) = 1
2001-01-01 2 b

View File

@ -14,7 +14,7 @@ ${CLICKHOUSE_CLIENT} --query="INSERT INTO test.kill_mutation VALUES ('2001-01-01
${CLICKHOUSE_CLIENT} --query="SELECT '*** Create and kill a single invalid mutation ***'"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation DELETE WHERE toUInt32(s) = 1 SETTINGS mutations_sync = 1" &
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation DELETE WHERE toUInt32(s) = 1 SETTINGS mutations_sync = 1" 2>/dev/null &
check_query1="SELECT count() FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation' AND is_done = 0"
@ -41,7 +41,7 @@ ${CLICKHOUSE_CLIENT} --query="SELECT mutation_id FROM system.mutations WHERE dat
${CLICKHOUSE_CLIENT} --query="SELECT '*** Create and kill invalid mutation that blocks another mutation ***'"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation DELETE WHERE toUInt32(s) = 1"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation DELETE WHERE x = 1 SETTINGS mutations_sync = 1" &
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation DELETE WHERE x = 1 SETTINGS mutations_sync = 1" 2>&1 | grep -o "happened during execution of mutations 'mutation_4.txt, mutation_5.txt'" | head -n 1 &
check_query2="SELECT count() FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation' AND mutation_id = 'mutation_4.txt'"

View File

@ -1,6 +1,6 @@
*** Create and kill a single invalid mutation ***
happened during execution of mutation '0000000000'
1
Mutation 0000000000 was killed
waiting test kill_mutation_r1 0000000000 DELETE WHERE toUInt32(s) = 1
0
*** Create and kill invalid mutation that blocks another mutation ***

View File

@ -18,7 +18,7 @@ ${CLICKHOUSE_CLIENT} --query="INSERT INTO test.kill_mutation_r1 VALUES ('2001-01
${CLICKHOUSE_CLIENT} --query="SELECT '*** Create and kill a single invalid mutation ***'"
# wrong mutation
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation_r1 DELETE WHERE toUInt32(s) = 1 SETTINGS mutations_sync=2" 2>&1 | grep -o "Mutation 0000000000 was killed" &
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation_r1 DELETE WHERE toUInt32(s) = 1 SETTINGS mutations_sync=2" 2>&1 | grep -o "happened during execution of mutation '0000000000'" | head -n 1
check_query1="SELECT count() FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation_r1' AND is_done = 0"
@ -51,6 +51,16 @@ ${CLICKHOUSE_CLIENT} --query="SELECT * FROM system.replication_queue WHERE table
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation_r1 DELETE WHERE toUInt32(s) = 1"
check_query1="SELECT count() FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation_r1' AND is_done = 0"
query_result=`$CLICKHOUSE_CLIENT --query="$check_query1" 2>&1`
while [ "$query_result" == "0" ]
do
query_result=`$CLICKHOUSE_CLIENT --query="$check_query1" 2>&1`
sleep 0.5
done
# good mutation, but blocked with wrong mutation
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation_r1 DELETE WHERE x = 1 SETTINGS mutations_sync=2" &