save information about failed parts for ordinary MergeTree mutations [#CLICKHOUSE-3192]

This commit is contained in:
Alexey Zatelepin 2019-01-10 20:06:27 +03:00
parent 339bcc8ceb
commit b8ee63fa3a
4 changed files with 76 additions and 16 deletions

View File

@ -2,6 +2,7 @@
#include <Core/Types.h>
#include <Storages/MutationCommands.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
namespace DB
@ -19,6 +20,11 @@ struct MergeTreeMutationEntry
Int64 block_number = 0;
String latest_failed_part;
MergeTreePartInfo latest_failed_part_info;
time_t latest_fail_time = 0;
String latest_fail_reason;
/// Create a new entry and write it to a temporary file.
MergeTreeMutationEntry(MutationCommands commands_, const String & path_prefix_, Int64 tmp_number);
MergeTreeMutationEntry(const MergeTreeMutationEntry &) = delete;

View File

@ -19,6 +19,10 @@ struct MergeTreeMutationStatus
/// If the mutation is done. Note that in case of ReplicatedMergeTree parts_to_do == 0 doesn't imply is_done == true.
bool is_done = false;
String latest_failed_part;
time_t latest_fail_time = 0;
String latest_fail_reason;
};
}

View File

@ -253,37 +253,74 @@ void StorageMergeTree::alter(
/// While exists, marks parts as 'currently_merging' and reserves free space on filesystem.
/// It's possible to mark parts before.
struct CurrentlyMergingPartsTagger
{
MergeTreeData::DataPartsVector parts;
MergeTreeDataMergerMutator::FuturePart future_part;
DiskSpaceMonitor::ReservationPtr reserved_space;
StorageMergeTree * storage = nullptr;
CurrentlyMergingPartsTagger() = default;
bool is_successful = false;
String exception_message;
CurrentlyMergingPartsTagger(const MergeTreeData::DataPartsVector & parts_, size_t total_size, StorageMergeTree & storage_)
: parts(parts_), storage(&storage_)
StorageMergeTree & storage;
public:
CurrentlyMergingPartsTagger(const MergeTreeDataMergerMutator::FuturePart & future_part_, size_t total_size, StorageMergeTree & storage_)
: future_part(future_part_), storage(storage_)
{
/// Assume mutex is already locked, because this method is called from mergeTask.
reserved_space = DiskSpaceMonitor::reserve(storage->full_path, total_size); /// May throw.
for (const auto & part : parts)
reserved_space = DiskSpaceMonitor::reserve(storage.full_path, total_size); /// May throw.
for (const auto & part : future_part.parts)
{
if (storage->currently_merging.count(part))
if (storage.currently_merging.count(part))
throw Exception("Tagging alreagy tagged part " + part->name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
}
storage->currently_merging.insert(parts.begin(), parts.end());
storage.currently_merging.insert(future_part.parts.begin(), future_part.parts.end());
}
~CurrentlyMergingPartsTagger()
{
std::lock_guard lock(storage->currently_merging_mutex);
std::lock_guard lock(storage.currently_merging_mutex);
for (const auto & part : parts)
for (const auto & part : future_part.parts)
{
if (!storage->currently_merging.count(part))
if (!storage.currently_merging.count(part))
std::terminate();
storage->currently_merging.erase(part);
storage.currently_merging.erase(part);
}
/// Update the information about failed parts in the system.mutations table.
Int64 sources_data_version = future_part.parts.at(0)->info.getDataVersion();
Int64 result_data_version = future_part.part_info.getDataVersion();
auto mutations_begin_it = storage.current_mutations_by_version.end();
auto mutations_end_it = storage.current_mutations_by_version.end();
if (sources_data_version != result_data_version)
{
mutations_begin_it = storage.current_mutations_by_version.upper_bound(sources_data_version);
mutations_end_it = storage.current_mutations_by_version.upper_bound(result_data_version);
}
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
{
MergeTreeMutationEntry & entry = it->second;
if (is_successful)
{
if (!entry.latest_failed_part.empty() && future_part.part_info.contains(entry.latest_failed_part_info))
{
entry.latest_failed_part.clear();
entry.latest_failed_part_info = MergeTreePartInfo();
entry.latest_fail_time = 0;
entry.latest_fail_reason.clear();
}
}
else
{
entry.latest_failed_part = future_part.parts.at(0)->name;
entry.latest_failed_part_info = future_part.parts.at(0)->info;
entry.latest_fail_time = time(nullptr);
entry.latest_fail_reason = exception_message;
}
}
}
};
@ -341,6 +378,9 @@ std::vector<MergeTreeMutationStatus> StorageMergeTree::getMutationsStatus() cons
block_numbers_map,
parts_to_do,
(parts_to_do == 0),
entry.latest_failed_part,
entry.latest_fail_time,
entry.latest_fail_reason,
});
}
}
@ -411,7 +451,7 @@ bool StorageMergeTree::merge(
if (!selected)
return false;
merging_tagger.emplace(future_part.parts, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this);
merging_tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this);
}
MergeList::EntryPtr merge_entry = global_context.getMergeList().insert(database_name, table_name, future_part.name, future_part.parts);
@ -469,10 +509,12 @@ bool StorageMergeTree::merge(
merging_tagger->reserved_space.get(), deduplicate);
merger_mutator.renameMergedTemporaryPart(new_part, future_part.parts, nullptr);
merging_tagger->is_successful = true;
write_part_log({});
}
catch (...)
{
merging_tagger->exception_message = getCurrentExceptionMessage(false);
write_part_log(ExecutionStatus::fromCurrentException());
throw;
}
@ -521,7 +563,7 @@ bool StorageMergeTree::tryMutatePart()
future_part.part_info = new_part_info;
future_part.name = part->getNewName(new_part_info);
tagger.emplace({part}, estimated_needed_space, *this);
tagger.emplace(future_part, estimated_needed_space, *this);
break;
}
}
@ -576,10 +618,12 @@ bool StorageMergeTree::tryMutatePart()
{
new_part = merger_mutator.mutatePartToTemporaryPart(future_part, commands, global_context);
data.renameTempPartAndReplace(new_part);
tagger->is_successful = true;
write_part_log({});
}
catch (...)
{
tagger->exception_message = getCurrentExceptionMessage(false);
write_part_log(ExecutionStatus::fromCurrentException());
throw;
}

View File

@ -26,6 +26,9 @@ NamesAndTypesList StorageSystemMutations::getNamesAndTypes()
{ "block_numbers.number", std::make_shared<DataTypeArray>(std::make_shared<DataTypeInt64>()) },
{ "parts_to_do", std::make_shared<DataTypeInt64>() },
{ "is_done", std::make_shared<DataTypeUInt8>() },
{ "latest_failed_part", std::make_shared<DataTypeString>() },
{ "latest_fail_time", std::make_shared<DataTypeDateTime>() },
{ "latest_fail_reason", std::make_shared<DataTypeString>() },
};
}
@ -118,6 +121,9 @@ void StorageSystemMutations::fillData(MutableColumns & res_columns, const Contex
res_columns[col_num++]->insert(block_numbers);
res_columns[col_num++]->insert(status.parts_to_do);
res_columns[col_num++]->insert(status.is_done);
res_columns[col_num++]->insert(status.latest_failed_part);
res_columns[col_num++]->insert(UInt64(status.latest_fail_time));
res_columns[col_num++]->insert(status.latest_fail_reason);
}
}
}