add latest_fail_error_code_name column to system.mutations

This commit is contained in:
Michael Stetsyuk 2024-11-22 15:24:46 +00:00
parent d3e3082807
commit 24e11b59d5
11 changed files with 43 additions and 19 deletions

View File

@ -613,6 +613,7 @@
M(733, TABLE_IS_BEING_RESTARTED) \
M(734, CANNOT_WRITE_AFTER_BUFFER_CANCELED) \
M(735, QUERY_WAS_CANCELLED_BY_CLIENT) \
M(736, PART_IS_LOCKED) \
\
M(900, DISTRIBUTED_CACHE_ERROR) \
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \

View File

@ -29,6 +29,7 @@ struct MergeTreeMutationEntry
MergeTreePartInfo latest_failed_part_info;
time_t latest_fail_time = 0;
String latest_fail_reason;
String latest_fail_error_code_name;
/// ID of transaction which has created mutation.
TransactionID tid = Tx::PrehistoricTID;

View File

@ -28,6 +28,7 @@ struct MergeTreeMutationStatus
String latest_failed_part = "";
time_t latest_fail_time = 0;
String latest_fail_reason = "";
String latest_fail_error_code_name = "";
/// FIXME: currently unused, but would be much better to report killed mutations with this flag.
bool is_killed = false;

View File

@ -3,6 +3,7 @@
#include <Storages/StorageMergeTree.h>
#include <Interpreters/TransactionLog.h>
#include <Common/ErrorCodes.h>
#include <Common/ProfileEventsScope.h>
#include <Core/Settings.h>
@ -110,7 +111,7 @@ bool MutatePlainMergeTreeTask::executeStep()
transaction.renameParts();
transaction.commit();
storage.updateMutationEntriesErrors(future_part, true, "");
storage.updateMutationEntriesErrors(future_part, true, "", "");
mutate_task->updateProfileEvents();
write_part_log({});
@ -124,7 +125,8 @@ bool MutatePlainMergeTreeTask::executeStep()
merge_mutate_entry->txn->onException();
PreformattedMessage exception_message = getCurrentExceptionMessageAndPattern(/* with_stacktrace */ false);
LOG_ERROR(getLogger("MutatePlainMergeTreeTask"), exception_message);
storage.updateMutationEntriesErrors(future_part, false, exception_message.text);
String error_code_name(ErrorCodes::getName(getCurrentExceptionCode()));
storage.updateMutationEntriesErrors(future_part, false, exception_message.text, error_code_name);
mutate_task->updateProfileEvents();
write_part_log(ExecutionStatus::fromCurrentException("", true));
tryLogCurrentException(__PRETTY_FUNCTION__);

View File

@ -4,6 +4,7 @@
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Common/ErrorCodes.h>
#include <Common/ProfileEventsScope.h>
@ -122,6 +123,7 @@ bool ReplicatedMergeMutateTaskBase::executeStep()
status.latest_failed_part_info = source_part_info;
status.latest_fail_time = time(nullptr);
status.latest_fail_reason = getExceptionMessage(saved_exception, false);
status.latest_fail_error_code_name = ErrorCodes::getName(getCurrentExceptionCode());
if (result_data_version == it->first)
storage.mutation_backoff_policy.addPartMutationFailure(src_part, (*storage.getSettings())[MergeTreeSetting::max_postpone_time_for_failed_mutations_ms]);
}

View File

@ -491,6 +491,7 @@ void ReplicatedMergeTreeQueue::removeCoveredPartsFromMutations(const String & pa
status.latest_failed_part_info = MergeTreePartInfo();
status.latest_fail_time = 0;
status.latest_fail_reason.clear();
status.latest_fail_error_code_name.clear();
}
}
@ -2135,6 +2136,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
LOG_TRACE(log, "Marking mutation {} done because it is <= mutation_pointer ({})", znode, mutation_pointer);
mutation.is_done = true;
mutation.latest_fail_reason.clear();
mutation.latest_fail_error_code_name.clear();
alter_sequence.finishDataAlter(mutation.entry->alter_version, lock);
if (mutation.parts_to_do.size() != 0)
{
@ -2195,6 +2197,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
LOG_TRACE(log, "Mutation {} is done", entry->znode_name);
it->second.is_done = true;
it->second.latest_fail_reason.clear();
it->second.latest_fail_error_code_name.clear();
if (entry->isAlterMutation())
{
LOG_TRACE(log, "Finishing data alter with version {} for entry {}", entry->alter_version, entry->znode_name);
@ -2314,6 +2317,7 @@ std::optional<MergeTreeMutationStatus> ReplicatedMergeTreeQueue::getIncompleteMu
.latest_failed_part = status.latest_failed_part,
.latest_fail_time = status.latest_fail_time,
.latest_fail_reason = status.latest_fail_reason,
.latest_fail_error_code_name = status.latest_fail_error_code_name,
};
if (mutation_ids && !status.latest_fail_reason.empty())
@ -2361,6 +2365,7 @@ std::vector<MergeTreeMutationStatus> ReplicatedMergeTreeQueue::getMutationsStatu
status.latest_failed_part,
status.latest_fail_time,
status.latest_fail_reason,
status.latest_fail_error_code_name,
});
}
}

View File

@ -148,6 +148,7 @@ private:
MergeTreePartInfo latest_failed_part_info;
time_t latest_fail_time = 0;
String latest_fail_reason;
String latest_fail_error_code_name;
};
/// Mapping from znode path to Mutations Status

View File

@ -34,6 +34,7 @@
#include <Storages/PartitionCommands.h>
#include <Storages/buildQueryTreeForShard.h>
#include <fmt/core.h>
#include <Common/ErrorCodes.h>
#include <Common/Exception.h>
#include <Common/MemoryTracker.h>
#include <Common/ProfileEventsScope.h>
@ -92,6 +93,7 @@ namespace ErrorCodes
extern const int SUPPORT_IS_DISABLED;
extern const int TABLE_IS_READ_ONLY;
extern const int TOO_MANY_PARTS;
extern const int PART_IS_LOCKED;
}
namespace ActionLocks
@ -572,7 +574,7 @@ Int64 StorageMergeTree::startMutation(const MutationCommands & commands, Context
}
void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr result_part, bool is_successful, const String & exception_message)
void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr result_part, bool is_successful, const String & exception_message, const String & error_code_name)
{
/// Update the information about failed parts in the system.mutations table.
@ -597,6 +599,7 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re
entry.latest_failed_part_info = MergeTreePartInfo();
entry.latest_fail_time = 0;
entry.latest_fail_reason.clear();
entry.latest_fail_error_code_name.clear();
if (static_cast<UInt64>(result_part->part_info.mutation) == it->first)
mutation_backoff_policy.removePartFromFailed(failed_part->name);
@ -609,6 +612,7 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re
entry.latest_failed_part_info = failed_part->info;
entry.latest_fail_time = time(nullptr);
entry.latest_fail_reason = exception_message;
entry.latest_fail_error_code_name = error_code_name;
if (static_cast<UInt64>(result_part->part_info.mutation) == it->first)
{
@ -754,6 +758,7 @@ std::optional<MergeTreeMutationStatus> StorageMergeTree::getIncompleteMutationsS
{
result.latest_failed_part = mutation_entry.latest_failed_part;
result.latest_fail_reason = mutation_entry.latest_fail_reason;
result.latest_fail_error_code_name = mutation_entry.latest_fail_error_code_name;
result.latest_fail_time = mutation_entry.latest_fail_time;
/// Fill all mutations which failed with the same error
@ -776,6 +781,7 @@ std::optional<MergeTreeMutationStatus> StorageMergeTree::getIncompleteMutationsS
{
result.latest_failed_part = data_part->name;
result.latest_fail_reason = fmt::format("Serialization error: part {} is locked by transaction {}", data_part->name, part_locked);
result.latest_fail_error_code_name = ErrorCodes::getName(ErrorCodes::PART_IS_LOCKED);
result.latest_fail_time = time(nullptr);
}
}
@ -856,6 +862,7 @@ std::vector<MergeTreeMutationStatus> StorageMergeTree::getMutationsStatus() cons
entry.latest_failed_part,
entry.latest_fail_time,
entry.latest_fail_reason,
entry.latest_fail_error_code_name,
});
}
}
@ -1348,6 +1355,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
MergeTreeMutationEntry & entry = it->second;
entry.latest_fail_time = time(nullptr);
entry.latest_fail_reason = getCurrentExceptionMessage(false);
entry.latest_fail_error_code_name = ErrorCodes::getName(getCurrentExceptionCode());
/// NOTE we should not skip mutations, because exception may be retryable (e.g. MEMORY_LIMIT_EXCEEDED)
break;
}

View File

@ -249,7 +249,7 @@ private:
/// Update mutation entries after part mutation execution. May reset old
/// errors if mutation was successful. Otherwise update last_failed* fields
/// in mutation entries.
void updateMutationEntriesErrors(FutureMergedMutatedPartPtr result_part, bool is_successful, const String & exception_message);
void updateMutationEntriesErrors(FutureMergedMutatedPartPtr result_part, bool is_successful, const String & exception_message, const String & error_code_name);
/// Return empty optional if mutation was killed. Otherwise return partially
/// filled mutation status with information about error (latest_fail*) and

View File

@ -20,29 +20,30 @@ ColumnsDescription StorageSystemMutations::getColumnsDescription()
{
return ColumnsDescription
{
{ "database", std::make_shared<DataTypeString>(), "The name of the database to which the mutation was applied."},
{ "table", std::make_shared<DataTypeString>(), "The name of the table to which the mutation was applied."},
{ "mutation_id", std::make_shared<DataTypeString>(), "The ID of the mutation. For replicated tables these IDs correspond to znode names in the <table_path_in_clickhouse_keeper>/mutations/ directory in ClickHouse Keeper. For non-replicated tables the IDs correspond to file names in the data directory of the table."},
{ "command", std::make_shared<DataTypeString>(), "The mutation command string (the part of the query after ALTER TABLE [db.]table)."},
{ "create_time", std::make_shared<DataTypeDateTime>(), "Date and time when the mutation command was submitted for execution."},
{ "block_numbers.partition_id", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "For mutations of replicated tables, the array contains the partitions' IDs (one record for each partition). For mutations of non-replicated tables the array is empty."},
{ "block_numbers.number", std::make_shared<DataTypeArray>(std::make_shared<DataTypeInt64>()),
{ "database", std::make_shared<DataTypeString>(), "The name of the database to which the mutation was applied."},
{ "table", std::make_shared<DataTypeString>(), "The name of the table to which the mutation was applied."},
{ "mutation_id", std::make_shared<DataTypeString>(), "The ID of the mutation. For replicated tables these IDs correspond to znode names in the <table_path_in_clickhouse_keeper>/mutations/ directory in ClickHouse Keeper. For non-replicated tables the IDs correspond to file names in the data directory of the table."},
{ "command", std::make_shared<DataTypeString>(), "The mutation command string (the part of the query after ALTER TABLE [db.]table)."},
{ "create_time", std::make_shared<DataTypeDateTime>(), "Date and time when the mutation command was submitted for execution."},
{ "block_numbers.partition_id", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "For mutations of replicated tables, the array contains the partitions' IDs (one record for each partition). For mutations of non-replicated tables the array is empty."},
{ "block_numbers.number", std::make_shared<DataTypeArray>(std::make_shared<DataTypeInt64>()),
"For mutations of replicated tables, the array contains one record for each partition, with the block number that was acquired by the mutation. "
"Only parts that contain blocks with numbers less than this number will be mutated in the partition."
"In non-replicated tables, block numbers in all partitions form a single sequence. "
"This means that for mutations of non-replicated tables, the column will contain one record with a single block number acquired by the mutation."
},
{ "parts_to_do_names", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "An array of names of data parts that need to be mutated for the mutation to complete."},
{ "parts_to_do", std::make_shared<DataTypeInt64>(), "The number of data parts that need to be mutated for the mutation to complete."},
{ "is_done", std::make_shared<DataTypeUInt8>(),
{ "parts_to_do_names", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "An array of names of data parts that need to be mutated for the mutation to complete."},
{ "parts_to_do", std::make_shared<DataTypeInt64>(), "The number of data parts that need to be mutated for the mutation to complete."},
{ "is_done", std::make_shared<DataTypeUInt8>(),
"The flag whether the mutation is done or not. Possible values: "
"1 if the mutation is completed, "
"0 if the mutation is still in process. "
},
{ "is_killed", std::make_shared<DataTypeUInt8>(), "Only available in ClickHouse Cloud."},
{ "latest_failed_part", std::make_shared<DataTypeString>(), "The name of the most recent part that could not be mutated."},
{ "latest_fail_time", std::make_shared<DataTypeDateTime>(), "The date and time of the most recent part mutation failure."},
{ "latest_fail_reason", std::make_shared<DataTypeString>(), "The exception message that caused the most recent part mutation failure."},
{ "is_killed", std::make_shared<DataTypeUInt8>(), "Only available in ClickHouse Cloud."},
{ "latest_failed_part", std::make_shared<DataTypeString>(), "The name of the most recent part that could not be mutated."},
{ "latest_fail_time", std::make_shared<DataTypeDateTime>(), "The date and time of the most recent part mutation failure."},
{ "latest_fail_reason", std::make_shared<DataTypeString>(), "The exception message that caused the most recent part mutation failure."},
{ "latest_fail_error_code_name", std::make_shared<DataTypeString>(), "The error code of the exception that caused the most recent part mutation failure."},
};
}
@ -161,6 +162,7 @@ void StorageSystemMutations::fillData(MutableColumns & res_columns, ContextPtr c
res_columns[col_num++]->insert(status.latest_failed_part);
res_columns[col_num++]->insert(UInt64(status.latest_fail_time));
res_columns[col_num++]->insert(status.latest_fail_reason);
res_columns[col_num++]->insert(status.latest_fail_error_code_name);
}
}
}

View File

@ -413,7 +413,8 @@ CREATE TABLE system.mutations
`is_killed` UInt8,
`latest_failed_part` String,
`latest_fail_time` DateTime,
`latest_fail_reason` String
`latest_fail_reason` String,
`latest_fail_error_code_name` String
)
ENGINE = SystemMutations
COMMENT 'Contains a list of mutations and their progress. Each mutation command is represented by a single row.'