kill ordinary MergeTree mutations [#CLICKHOUSE-3912]

This commit is contained in:
Alexey Zatelepin 2019-01-14 15:25:25 +03:00
parent 5832e474a4
commit ca93fd483a
5 changed files with 56 additions and 13 deletions

View File

@ -15,8 +15,10 @@ namespace DB
{
MergeListElement::MergeListElement(const std::string & database, const std::string & table, const FutureMergedMutatedPart & future_part)
: database{database}, table{table}
, result_part_name{future_part.name}, num_parts{future_part.parts.size()}
: database{database}, table{table}, partition_id{future_part.part_info.partition_id}
, result_part_name{future_part.name}
, result_data_version{future_part.part_info.getDataVersion()}
, num_parts{future_part.parts.size()}
, thread_number{Poco::ThreadNumber::get()}
{
for (const auto & source_part : future_part.parts)
@ -31,8 +33,8 @@ MergeListElement::MergeListElement(const std::string & database, const std::stri
if (!future_part.parts.empty())
{
partition_id = future_part.parts[0]->info.partition_id;
is_mutation = future_part.parts[0]->info.getDataVersion() != future_part.part_info.getDataVersion();
source_data_version = future_part.parts[0]->info.getDataVersion();
is_mutation = (result_data_version != source_data_version);
}
/// Each merge is executed into separate background processing pool thread

View File

@ -51,13 +51,20 @@ struct MergeListElement : boost::noncopyable
{
const std::string database;
const std::string table;
const std::string result_part_name;
std::string partition_id;
bool is_mutation = false;
Stopwatch watch;
std::atomic<Float64> progress{};
const std::string result_part_name;
Int64 result_data_version{};
bool is_mutation{};
UInt64 num_parts{};
Names source_part_names;
Int64 source_data_version{};
Stopwatch watch;
std::atomic<Float64> progress{};
std::atomic<bool> is_cancelled{};
UInt64 total_size_bytes_compressed{};
UInt64 total_size_marks{};
std::atomic<UInt64> bytes_read_uncompressed{};
@ -138,6 +145,17 @@ public:
res.emplace_back(merge_element.getInfo());
return res;
}
void cancelMutation(Int64 mutation_version)
{
std::lock_guard lock{mutex};
for (auto & merge_element : merges)
{
if (merge_element.source_data_version < mutation_version
&& merge_element.result_data_version >= mutation_version)
merge_element.is_cancelled = true;
}
}
};

View File

@ -813,7 +813,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
{
auto check_not_cancelled = [&]()
{
if (actions_blocker.isCancelled())
if (actions_blocker.isCancelled() || merge_entry->is_cancelled)
throw Exception("Cancelled mutating parts", ErrorCodes::ABORTED);
return true;

View File

@ -336,7 +336,8 @@ void StorageMergeTree::mutate(const MutationCommands & commands, const Context &
Int64 version = increment.get();
entry.commit(version);
file_name = entry.file_name;
current_mutations_by_version.emplace(version, std::move(entry));
auto insertion = current_mutations_by_id.emplace(file_name, std::move(entry));
current_mutations_by_version.emplace(version, insertion.first->second);
}
LOG_INFO(log, "Added mutation: " << file_name);
@ -389,7 +390,26 @@ std::vector<MergeTreeMutationStatus> StorageMergeTree::getMutationsStatus() cons
void StorageMergeTree::killMutation(const String & mutation_id)
{
LOG_TRACE(log, "KILL MUTATION " << mutation_id);
LOG_TRACE(log, "Killing mutation " << mutation_id);
std::optional<MergeTreeMutationEntry> to_kill;
{
std::lock_guard lock(currently_merging_mutex);
auto it = current_mutations_by_id.find(mutation_id);
if (it != current_mutations_by_id.end())
{
to_kill.emplace(std::move(it->second));
current_mutations_by_id.erase(it);
current_mutations_by_version.erase(to_kill->block_number);
}
}
if (to_kill)
{
global_context.getMergeList().cancelMutation(to_kill->block_number);
to_kill->removeFile();
LOG_TRACE(log, "Cancelled part mutations and removed mutation file " << mutation_id);
}
}
@ -402,7 +422,8 @@ void StorageMergeTree::loadMutations()
{
MergeTreeMutationEntry entry(full_path, it.name());
Int64 block_number = entry.block_number;
current_mutations_by_version.emplace(block_number, std::move(entry));
auto insertion = current_mutations_by_id.emplace(it.name(), std::move(entry));
current_mutations_by_version.emplace(block_number, insertion.first->second);
}
else if (startsWith(it.name(), "tmp_mutation_"))
{
@ -720,6 +741,7 @@ void StorageMergeTree::clearOldMutations()
for (size_t i = 0; i < to_delete_count; ++i)
{
mutations_to_delete.push_back(std::move(it->second));
current_mutations_by_id.erase(mutations_to_delete.back().file_name);
it = current_mutations_by_version.erase(it);
}
}

View File

@ -117,7 +117,8 @@ private:
mutable std::mutex currently_merging_mutex;
MergeTreeData::DataParts currently_merging;
std::multimap<Int64, MergeTreeMutationEntry> current_mutations_by_version;
std::map<String, MergeTreeMutationEntry> current_mutations_by_id;
std::multimap<Int64, MergeTreeMutationEntry &> current_mutations_by_version;
Logger * log;