Fix deadlock and better code

This commit is contained in:
alesapin 2020-07-22 22:29:54 +03:00
parent 8c8bdd5070
commit 355150afce
3 changed files with 47 additions and 51 deletions

View File

@ -1,4 +1,5 @@
#include <Storages/MergeTree/MergeTreeMutationStatus.h> #include <Storages/MergeTree/MergeTreeMutationStatus.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <boost/algorithm/string/join.hpp> #include <boost/algorithm/string/join.hpp>

View File

@ -284,9 +284,6 @@ struct CurrentlyMergingPartsTagger
FutureMergedMutatedPart future_part; FutureMergedMutatedPart future_part;
ReservationPtr reserved_space; ReservationPtr reserved_space;
bool is_successful = false;
String exception_message;
StorageMergeTree & storage; StorageMergeTree & storage;
public: public:
@ -339,44 +336,6 @@ public:
storage.currently_merging_mutating_parts.erase(part); storage.currently_merging_mutating_parts.erase(part);
} }
/// Update the information about failed parts in the system.mutations table.
Int64 sources_data_version = future_part.parts.at(0)->info.getDataVersion();
Int64 result_data_version = future_part.part_info.getDataVersion();
auto mutations_begin_it = storage.current_mutations_by_version.end();
auto mutations_end_it = storage.current_mutations_by_version.end();
if (sources_data_version != result_data_version)
{
mutations_begin_it = storage.current_mutations_by_version.upper_bound(sources_data_version);
mutations_end_it = storage.current_mutations_by_version.upper_bound(result_data_version);
}
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
{
MergeTreeMutationEntry & entry = it->second;
if (is_successful)
{
if (!entry.latest_failed_part.empty() && future_part.part_info.contains(entry.latest_failed_part_info))
{
entry.latest_failed_part.clear();
entry.latest_failed_part_info = MergeTreePartInfo();
entry.latest_fail_time = 0;
entry.latest_fail_reason.clear();
}
}
else
{
entry.latest_failed_part = future_part.parts.at(0)->name;
entry.latest_failed_part_info = future_part.parts.at(0)->info;
entry.latest_fail_time = time(nullptr);
entry.latest_fail_reason = exception_message;
{
std::lock_guard lock_mutation_wait(storage.mutation_wait_mutex);
storage.mutation_wait_event.notify_all();
}
}
}
storage.currently_processing_in_background_condition.notify_all(); storage.currently_processing_in_background_condition.notify_all();
} }
}; };
@ -402,6 +361,46 @@ Int64 StorageMergeTree::startMutation(const MutationCommands & commands, String
return version; return version;
} }
void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPart result_part, bool is_successful, const String & exception_message)
{
/// Update the information about failed parts in the system.mutations table.
Int64 sources_data_version = result_part.parts.at(0)->info.getDataVersion();
Int64 result_data_version = result_part.part_info.getDataVersion();
if (sources_data_version != result_data_version)
{
std::lock_guard lock(currently_processing_in_background_mutex);
auto mutations_begin_it = current_mutations_by_version.upper_bound(sources_data_version);
auto mutations_end_it = current_mutations_by_version.upper_bound(result_data_version);
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
{
MergeTreeMutationEntry & entry = it->second;
if (is_successful)
{
if (!entry.latest_failed_part.empty() && result_part.part_info.contains(entry.latest_failed_part_info))
{
entry.latest_failed_part.clear();
entry.latest_failed_part_info = MergeTreePartInfo();
entry.latest_fail_time = 0;
entry.latest_fail_reason.clear();
}
}
else
{
entry.latest_failed_part = result_part.parts.at(0)->name;
entry.latest_failed_part_info = result_part.parts.at(0)->info;
entry.latest_fail_time = time(nullptr);
entry.latest_fail_reason = exception_message;
}
}
}
std::unique_lock lock(mutation_wait_mutex);
mutation_wait_event.notify_all();
}
void StorageMergeTree::waitForMutation(Int64 version, const String & file_name) void StorageMergeTree::waitForMutation(Int64 version, const String & file_name)
{ {
LOG_INFO(log, "Waiting mutation: {}", file_name); LOG_INFO(log, "Waiting mutation: {}", file_name);
@ -733,12 +732,10 @@ bool StorageMergeTree::merge(
merging_tagger->reserved_space, deduplicate, force_ttl); merging_tagger->reserved_space, deduplicate, force_ttl);
merger_mutator.renameMergedTemporaryPart(new_part, future_part.parts, nullptr); merger_mutator.renameMergedTemporaryPart(new_part, future_part.parts, nullptr);
merging_tagger->is_successful = true;
write_part_log({}); write_part_log({});
} }
catch (...) catch (...)
{ {
merging_tagger->exception_message = getCurrentExceptionMessage(false);
write_part_log(ExecutionStatus::fromCurrentException()); write_part_log(ExecutionStatus::fromCurrentException());
throw; throw;
} }
@ -875,18 +872,12 @@ bool StorageMergeTree::tryMutatePart()
renameTempPartAndReplace(new_part); renameTempPartAndReplace(new_part);
tagger->is_successful = true; updateMutationEntriesErrors(future_part, true, "");
write_part_log({}); write_part_log({});
/// Notify all, who wait for this or previous mutations
{
std::lock_guard lock(mutation_wait_mutex);
mutation_wait_event.notify_all();
}
} }
catch (...) catch (...)
{ {
tagger->exception_message = getCurrentExceptionMessage(false); updateMutationEntriesErrors(future_part, false, getCurrentExceptionMessage(false));
write_part_log(ExecutionStatus::fromCurrentException()); write_part_log(ExecutionStatus::fromCurrentException());
throw; throw;
} }

View File

@ -153,6 +153,10 @@ private:
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context); void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context);
void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context); void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context);
bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override; bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override;
/// Update mutation entries after part mutation execution. May reset old
/// errors if mutation was successful. Otherwise update last_failed* fields
/// in mutation entries.
void updateMutationEntriesErrors(FutureMergedMutatedPart result_part, bool is_successful, const String & exception_message);
/// Return empty optional if mutation was killed. Otherwise return partially /// Return empty optional if mutation was killed. Otherwise return partially
/// filled mutation status with information about error (latest_fail*) and /// filled mutation status with information about error (latest_fail*) and